Compare commits
2 Commits
6594363a91
...
3ebd1d2c39
Author | SHA1 | Date |
---|---|---|
|
3ebd1d2c39 | |
|
7100bf71bf |
|
@ -52,6 +52,12 @@
|
|||
<artifactId>mysql-connector-j</artifactId>
|
||||
</dependency>
|
||||
|
||||
<!-- IotDB会话 -->
|
||||
<dependency>
|
||||
<groupId>org.apache.iotdb</groupId>
|
||||
<artifactId>iotdb-session</artifactId>
|
||||
</dependency>
|
||||
|
||||
<!-- MuYu Common DataSource -->
|
||||
<dependency>
|
||||
<groupId>com.muyu</groupId>
|
||||
|
@ -82,10 +88,10 @@
|
|||
<artifactId>cloud-common-core</artifactId>
|
||||
</dependency>
|
||||
|
||||
<!-- IotDB会话 -->
|
||||
<!-- kafka模块-->
|
||||
<dependency>
|
||||
<groupId>org.apache.iotdb</groupId>
|
||||
<artifactId>iotdb-session</artifactId>
|
||||
<groupId>com.muyu</groupId>
|
||||
<artifactId>cloud-common-kafka</artifactId>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
</project>
|
|
@ -0,0 +1,37 @@
|
|||
package com.muyu.event.process.basic;
|
||||
|
||||
import org.springframework.context.ApplicationEvent;
|
||||
|
||||
/**
|
||||
* @Author: zi run
|
||||
* @Date 2024/9/30 15:11
|
||||
* @Description 基础事件
|
||||
*/
|
||||
public class BasicEvent<T> extends ApplicationEvent {
|
||||
|
||||
/**
|
||||
* 事件携带的数据
|
||||
*/
|
||||
private final T data;
|
||||
|
||||
/**
|
||||
* 构造函数,初始化事件源和数据
|
||||
*
|
||||
* @param source 事件源对象
|
||||
* @param data 事件携带的数据
|
||||
*/
|
||||
public BasicEvent(Object source, T data) {
|
||||
super(source);
|
||||
this.data = data;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* 获取事件携带的数据
|
||||
*
|
||||
* @return 事件数据
|
||||
*/
|
||||
public T getData() {
|
||||
return data;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,37 @@
|
|||
package com.muyu.event.process.basic;
|
||||
|
||||
import org.springframework.context.ApplicationListener;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
/**
|
||||
* @Author: zi run
|
||||
* @Date 2024/9/30 15:37
|
||||
* @Description 基础事件处理器
|
||||
*/
|
||||
@Component
|
||||
public class BasicEventHandler<T> implements ApplicationListener<BasicEvent<T>> {
|
||||
|
||||
/**
|
||||
* 具体事件监听器
|
||||
*/
|
||||
private final BasicEventListener<T> listener;
|
||||
|
||||
/**
|
||||
* 构造函数,用于注入具体事件监听器
|
||||
*
|
||||
* @param listener 具体事件监听器
|
||||
*/
|
||||
public BasicEventHandler(BasicEventListener<T> listener) {
|
||||
this.listener = listener;
|
||||
}
|
||||
|
||||
/**
|
||||
* 处理应用事件
|
||||
*
|
||||
* @param event 事件对象
|
||||
*/
|
||||
@Override
|
||||
public void onApplicationEvent(BasicEvent<T> event) {
|
||||
listener.onEvent(event);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,16 @@
|
|||
package com.muyu.event.process.basic;
|
||||
|
||||
/**
|
||||
* @Author: zi run
|
||||
* @Date 2024/9/30 15:35
|
||||
* @Description 基础事件监听器
|
||||
*/
|
||||
public interface BasicEventListener<T> {
|
||||
|
||||
/**
|
||||
* 处理事件的方法
|
||||
*
|
||||
* @param event 事件对象
|
||||
*/
|
||||
void onEvent(BasicEvent<T> event);
|
||||
}
|
|
@ -0,0 +1,38 @@
|
|||
package com.muyu.event.process.basic;
|
||||
|
||||
import org.springframework.context.ApplicationEventPublisher;
|
||||
import org.springframework.context.ApplicationEventPublisherAware;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
/**
|
||||
* @Author: zi run
|
||||
* @Date 2024/9/29 22:01
|
||||
* @Description 事件发布者
|
||||
*/
|
||||
@Component
|
||||
public class EventPublisher implements ApplicationEventPublisherAware {
|
||||
|
||||
/**
|
||||
* 应用程序事件发布者,用于发布事件
|
||||
*/
|
||||
private ApplicationEventPublisher publisher;
|
||||
|
||||
/**
|
||||
* 设置应用程序事件发布者
|
||||
*
|
||||
* @param publisher 应用程序事件发布者实例
|
||||
*/
|
||||
@Override
|
||||
public void setApplicationEventPublisher(ApplicationEventPublisher publisher) {
|
||||
this.publisher = publisher;
|
||||
}
|
||||
|
||||
/**
|
||||
* 发布事件
|
||||
* @param event 要发布的事件
|
||||
* @param <T> 事件数据类型
|
||||
*/
|
||||
public <T> void publish(BasicEvent<T> event) {
|
||||
publisher.publishEvent(event);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,52 @@
|
|||
package com.muyu.event.process.consumer;
|
||||
|
||||
import cn.hutool.core.thread.ThreadUtil;
|
||||
import com.alibaba.nacos.shaded.com.google.common.collect.Lists;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.kafka.clients.consumer.ConsumerRecords;
|
||||
import org.apache.kafka.clients.consumer.KafkaConsumer;
|
||||
import org.springframework.beans.factory.InitializingBean;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import java.time.Duration;
|
||||
import java.util.Collection;
|
||||
|
||||
/**
|
||||
* @Author: zi run
|
||||
* @Date 2024/9/29 16:53
|
||||
* @Description 测试消费者
|
||||
*/
|
||||
@Slf4j
|
||||
//@Component
|
||||
@RequiredArgsConstructor
|
||||
public class TestConsumer implements InitializingBean {
|
||||
|
||||
/**
|
||||
* kafka消费者
|
||||
*/
|
||||
private final KafkaConsumer<String, String> kafkaConsumer;
|
||||
|
||||
/**
|
||||
* kafka主题名称
|
||||
*/
|
||||
private static final String topicName = "test-topic";
|
||||
|
||||
|
||||
@Override
|
||||
public void afterPropertiesSet() throws Exception {
|
||||
new Thread(() -> {
|
||||
log.info("启动线程监听Topic: {}", topicName);
|
||||
ThreadUtil.sleep(1000);
|
||||
Collection<String> topics = Lists.newArrayList(topicName);
|
||||
kafkaConsumer.subscribe(topics);
|
||||
while (true) {
|
||||
ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofMillis(1000));
|
||||
consumerRecords.forEach(record -> {
|
||||
String value = record.value();
|
||||
log.info("从Kafka中消费的原始数据: {}", value);
|
||||
});
|
||||
}
|
||||
}).start();
|
||||
}
|
||||
}
|
|
@ -0,0 +1,93 @@
|
|||
package com.muyu.event.process.consumer;
|
||||
|
||||
import com.muyu.event.process.basic.EventPublisher;
|
||||
import com.muyu.event.process.event.IoTDBInsertDataEvent;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.kafka.clients.consumer.ConsumerRecord;
|
||||
import org.apache.kafka.clients.consumer.ConsumerRecords;
|
||||
import org.apache.kafka.clients.consumer.KafkaConsumer;
|
||||
import org.springframework.boot.ApplicationArguments;
|
||||
import org.springframework.boot.ApplicationRunner;
|
||||
import org.springframework.context.ApplicationListener;
|
||||
import org.springframework.context.event.ContextClosedEvent;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import java.time.Duration;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
/**
|
||||
* @Author: zi run
|
||||
* @Date 2024/9/29 23:23
|
||||
* @Description 车辆消费者
|
||||
*/
|
||||
@Slf4j
|
||||
@Component
|
||||
@RequiredArgsConstructor
|
||||
public class VehicleConsumer implements ApplicationRunner, ApplicationListener<ContextClosedEvent> {
|
||||
|
||||
/**
|
||||
* kafka消费者
|
||||
*/
|
||||
private final KafkaConsumer<String, String> kafkaConsumer;
|
||||
|
||||
/**
|
||||
* 事件发布者
|
||||
*/
|
||||
private final EventPublisher eventPublisher;
|
||||
|
||||
/**
|
||||
* 协议解析报文传递数据(队列名称)
|
||||
*/
|
||||
public final static String MESSAGE_PARSING = "MessageParsing";
|
||||
|
||||
/**
|
||||
* 设定固定大小的线程池,线程数量与当前可用的处理器核心数相同
|
||||
*/
|
||||
private final ExecutorService executorService =
|
||||
Executors.newFixedThreadPool(10);
|
||||
|
||||
@Override
|
||||
public void run(ApplicationArguments args) throws Exception {
|
||||
log.info("启动线程监听Topic: {}", MESSAGE_PARSING);
|
||||
List<String> topics = Collections.singletonList(MESSAGE_PARSING);
|
||||
kafkaConsumer.subscribe(topics);
|
||||
while (true) {
|
||||
ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofMillis(100));
|
||||
consumerRecords.forEach(consumerRecord -> executorService.submit(() -> handleRecord(consumerRecord)));
|
||||
}
|
||||
}
|
||||
|
||||
private void handleRecord(ConsumerRecord<String, String> consumerRecord) {
|
||||
String message = consumerRecord.value();
|
||||
log.info("接收到车辆报文数据,内容:{}", message);
|
||||
log.info("------------------------------------------------");
|
||||
eventPublisher.publish(new IoTDBInsertDataEvent(this, message));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onApplicationEvent(ContextClosedEvent event) {
|
||||
log.info("关闭线程池和Kafka消费者");
|
||||
|
||||
try {
|
||||
executorService.shutdown();
|
||||
if (!executorService.awaitTermination(60, TimeUnit.SECONDS)) {
|
||||
executorService.shutdownNow();
|
||||
}
|
||||
} catch (InterruptedException e) {
|
||||
log.error("线程池关闭被中断,强制关闭", e);
|
||||
executorService.shutdownNow();
|
||||
Thread.currentThread().interrupt();
|
||||
}
|
||||
|
||||
try {
|
||||
kafkaConsumer.close(); // 关闭Kafka消费者
|
||||
} catch (Exception e) {
|
||||
log.error("关闭Kafka消费者时发生错误", e);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,88 @@
|
|||
package com.muyu.event.process.controller;
|
||||
|
||||
import com.alibaba.fastjson2.JSONObject;
|
||||
import com.muyu.common.core.constant.Constants;
|
||||
import com.muyu.common.core.domain.Result;
|
||||
import com.muyu.common.core.web.controller.BaseController;
|
||||
import com.muyu.event.process.iotdb.service.TestIoTDBService;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import org.apache.kafka.clients.producer.KafkaProducer;
|
||||
import org.apache.kafka.clients.producer.ProducerRecord;
|
||||
import org.springframework.web.bind.annotation.GetMapping;
|
||||
import org.springframework.web.bind.annotation.PostMapping;
|
||||
import org.springframework.web.bind.annotation.RequestMapping;
|
||||
import org.springframework.web.bind.annotation.RestController;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* @Author: zi run
|
||||
* @Date 2024/9/29 16:24
|
||||
* @Description 测试事件控制层
|
||||
*/
|
||||
@RestController
|
||||
@RequiredArgsConstructor
|
||||
@RequestMapping(value = "/test-event")
|
||||
public class TestEventController extends BaseController {
|
||||
|
||||
/**
|
||||
* kafka生产者
|
||||
*/
|
||||
private final KafkaProducer<String, String> kafkaProducer;
|
||||
|
||||
/**
|
||||
* kafka主题名称
|
||||
*/
|
||||
private static final String kafkaTopicName = "test-topic";
|
||||
|
||||
/**
|
||||
* 测试IoTDB业务层
|
||||
*/
|
||||
private final TestIoTDBService testIoTDBService;
|
||||
|
||||
/**
|
||||
* 发送Kafka测试消息
|
||||
*
|
||||
* @return 响应结果
|
||||
*/
|
||||
@GetMapping(value = "/sendKafka")
|
||||
public Result<String> senKafka() {
|
||||
JSONObject jsonObject = new JSONObject();
|
||||
jsonObject.put("id","1");
|
||||
jsonObject.put("name","张三");
|
||||
jsonObject.put("age","18");
|
||||
jsonObject.put("sex","男");
|
||||
ProducerRecord<String, String> producerRecord = new ProducerRecord<>(kafkaTopicName, jsonObject.toJSONString());
|
||||
kafkaProducer.send(producerRecord);
|
||||
return Result.success(null, Constants.SUCCESS_MESSAGE);
|
||||
}
|
||||
|
||||
/**
|
||||
* 查询IoTDB数据列表
|
||||
* @return 响应结果
|
||||
*/
|
||||
@GetMapping(value = "/list")
|
||||
public Result<List<Map<String, Object>>> list() {
|
||||
return Result.success(testIoTDBService.list(), Constants.SUCCESS_MESSAGE);
|
||||
}
|
||||
|
||||
/**
|
||||
* 向IoTDB添加数据
|
||||
*
|
||||
* @return 响应结果
|
||||
*/
|
||||
@PostMapping(value = "/save")
|
||||
public Result<String> save() {
|
||||
String deviceId = "root.test";
|
||||
ArrayList<String> keyList = new ArrayList<>();
|
||||
ArrayList<String> valueList = new ArrayList<>();
|
||||
keyList.add("car_vin");
|
||||
keyList.add("car_name");
|
||||
valueList.add("VIN123456");
|
||||
valueList.add("宝马");
|
||||
testIoTDBService.insertStringRecord(deviceId, System.currentTimeMillis(), keyList, valueList);
|
||||
return Result.success(null, Constants.SUCCESS_MESSAGE);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,20 @@
|
|||
package com.muyu.event.process.event;
|
||||
|
||||
import com.muyu.event.process.basic.BasicEvent;
|
||||
|
||||
/**
|
||||
* @Author: zi run
|
||||
* @Date 2024/9/29 21:19
|
||||
* @Description 向IoTDB插入数据事件
|
||||
*/
|
||||
public class IoTDBInsertDataEvent extends BasicEvent<String> {
|
||||
|
||||
/**
|
||||
* 构造函数,向IoTDB插入数据创建事件
|
||||
*
|
||||
* @param messsge 消息
|
||||
*/
|
||||
public IoTDBInsertDataEvent(Object source, String messsge) {
|
||||
super(source, messsge);
|
||||
}
|
||||
}
|
|
@ -1,4 +1,4 @@
|
|||
package com.muyu.event.process.basic.config;
|
||||
package com.muyu.event.process.iotdb.basic.config;
|
||||
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.iotdb.session.pool.SessionPool;
|
|
@ -1,7 +1,7 @@
|
|||
package com.muyu.event.process.basic.service;
|
||||
package com.muyu.event.process.iotdb.basic.service;
|
||||
|
||||
import com.muyu.event.process.domain.dto.IoTDbRecordAble;
|
||||
import com.muyu.event.process.domain.dto.MeasurementSchemaValuesDTO;
|
||||
import com.muyu.event.process.iotdb.domain.dto.IoTDbRecordAble;
|
||||
import com.muyu.event.process.iotdb.domain.dto.MeasurementSchemaValuesDTO;
|
||||
import org.apache.iotdb.common.rpc.thrift.TAggregationType;
|
||||
import org.apache.iotdb.isession.SessionDataSet;
|
||||
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
|
||||
|
@ -14,9 +14,9 @@ import java.util.Map;
|
|||
/**
|
||||
* @Author: zi run
|
||||
* @Date 2024/9/28 23:37
|
||||
* @Description IoTDB业务层
|
||||
* @Description IoTDB基准业务层
|
||||
*/
|
||||
public interface IoTDBService {
|
||||
public interface IService {
|
||||
|
||||
/**
|
||||
* 插入一个 Tablet 对象到 IoTDB 数据库
|
|
@ -1,11 +1,10 @@
|
|||
package com.muyu.event.process.basic.service.impl;
|
||||
package com.muyu.event.process.iotdb.basic.service.impl;
|
||||
|
||||
import com.alibaba.fastjson.JSON;
|
||||
import com.muyu.event.process.basic.config.IoTDBSessionConfig;
|
||||
import com.muyu.event.process.basic.service.IoTDBService;
|
||||
import com.muyu.event.process.domain.dto.IoTDbRecordAble;
|
||||
import com.muyu.event.process.domain.dto.MeasurementSchemaValuesDTO;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import com.muyu.event.process.iotdb.basic.config.IoTDBSessionConfig;
|
||||
import com.muyu.event.process.iotdb.basic.service.IService;
|
||||
import com.muyu.event.process.iotdb.domain.dto.IoTDbRecordAble;
|
||||
import com.muyu.event.process.iotdb.domain.dto.MeasurementSchemaValuesDTO;
|
||||
import lombok.SneakyThrows;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.iotdb.common.rpc.thrift.TAggregationType;
|
||||
|
@ -17,6 +16,7 @@ import org.apache.iotdb.tsfile.read.common.RowRecord;
|
|||
import org.apache.iotdb.tsfile.write.record.Tablet;
|
||||
import org.apache.iotdb.isession.SessionDataSet;
|
||||
import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
import java.lang.reflect.Type;
|
||||
|
@ -26,17 +26,17 @@ import java.util.stream.Collectors;
|
|||
/**
|
||||
* @Author: zi run
|
||||
* @Date 2024/9/28 23:38
|
||||
* @Description IoTDB业务实现层
|
||||
* @Description IoTDB基准业务实现层
|
||||
*/
|
||||
@Slf4j
|
||||
@Service
|
||||
@RequiredArgsConstructor
|
||||
public class IoTDBServiceImpl implements IoTDBService {
|
||||
public class ServiceImpl implements IService {
|
||||
|
||||
/**
|
||||
* IoTDB会话配置
|
||||
*/
|
||||
private final IoTDBSessionConfig ioTDBSessionConfig;
|
||||
@Autowired
|
||||
private IoTDBSessionConfig ioTDBSessionConfig;
|
||||
|
||||
/**
|
||||
* 插入一个 Tablet 对象到 IoTDB 数据库
|
|
@ -1,4 +1,4 @@
|
|||
package com.muyu.event.process.domain;
|
||||
package com.muyu.event.process.iotdb.domain;
|
||||
|
||||
import io.swagger.v3.oas.annotations.media.Schema;
|
||||
import io.swagger.v3.oas.annotations.tags.Tag;
|
|
@ -1,6 +1,6 @@
|
|||
package com.muyu.event.process.domain;
|
||||
package com.muyu.event.process.iotdb.domain;
|
||||
|
||||
import com.muyu.event.process.domain.dto.IoTDbRecordAble;
|
||||
import com.muyu.event.process.iotdb.domain.dto.IoTDbRecordAble;
|
||||
import lombok.Data;
|
||||
import lombok.EqualsAndHashCode;
|
||||
|
|
@ -1,4 +1,4 @@
|
|||
package com.muyu.event.process.domain;
|
||||
package com.muyu.event.process.iotdb.domain;
|
||||
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.Builder;
|
|
@ -1,4 +1,4 @@
|
|||
package com.muyu.event.process.domain.dto;
|
||||
package com.muyu.event.process.iotdb.domain.dto;
|
||||
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.Builder;
|
|
@ -1,4 +1,4 @@
|
|||
package com.muyu.event.process.domain.dto;
|
||||
package com.muyu.event.process.iotdb.domain.dto;
|
||||
|
||||
import lombok.Data;
|
||||
|
|
@ -1,4 +1,4 @@
|
|||
package com.muyu.event.process.domain.dto;
|
||||
package com.muyu.event.process.iotdb.domain.dto;
|
||||
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.Data;
|
|
@ -0,0 +1,11 @@
|
|||
package com.muyu.event.process.iotdb.service;
|
||||
|
||||
import com.muyu.event.process.iotdb.basic.service.IService;
|
||||
|
||||
/**
|
||||
* @Author: zi run
|
||||
* @Date 2024/9/29 22:38
|
||||
* @Description IoTDB业务层
|
||||
*/
|
||||
public interface IoTDBService extends IService {
|
||||
}
|
|
@ -0,0 +1,20 @@
|
|||
package com.muyu.event.process.iotdb.service;
|
||||
|
||||
import com.muyu.event.process.iotdb.basic.service.IService;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* @Author: zi run
|
||||
* @Date 2024/9/29 17:23
|
||||
* @Description 测试IoTDB业务层
|
||||
*/
|
||||
public interface TestIoTDBService extends IService {
|
||||
|
||||
/**
|
||||
* 查询IoTDB数据列表
|
||||
* @return 返回结果
|
||||
*/
|
||||
List<Map<String, Object>> list();
|
||||
}
|
|
@ -0,0 +1,14 @@
|
|||
package com.muyu.event.process.iotdb.service.impl;
|
||||
|
||||
import com.muyu.event.process.iotdb.basic.service.impl.ServiceImpl;
|
||||
import com.muyu.event.process.iotdb.service.IoTDBService;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
/**
|
||||
* @Author: zi run
|
||||
* @Date 2024/9/29 22:39
|
||||
* @Description IoTDB业务实现层
|
||||
*/
|
||||
@Service
|
||||
public class IoTDBServiceImpl extends ServiceImpl implements IoTDBService {
|
||||
}
|
|
@ -0,0 +1,33 @@
|
|||
package com.muyu.event.process.iotdb.service.impl;
|
||||
|
||||
import com.muyu.event.process.iotdb.basic.service.impl.ServiceImpl;
|
||||
import com.muyu.event.process.iotdb.service.TestIoTDBService;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.iotdb.isession.SessionDataSet;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* @Author: zi run
|
||||
* @Date 2024/9/29 17:24
|
||||
* @Description 测试IoTDB业务实现层
|
||||
*/
|
||||
@Slf4j
|
||||
@Service
|
||||
public class TestIoTDBServiceImpl extends ServiceImpl implements TestIoTDBService {
|
||||
|
||||
/**
|
||||
* 查询IoTDB数据列表
|
||||
* @return 返回结果
|
||||
*/
|
||||
@Override
|
||||
public List<Map<String, Object>> list() {
|
||||
String sql = "select * from root.test";
|
||||
SessionDataSet sessionDataSet = this.executeQueryStatement(sql);
|
||||
List<Map<String, Object>> list = this.packagingMapData(sessionDataSet, sessionDataSet.getColumnTypes());
|
||||
log.info("查询IoTDB数据为:{}", list.toString());
|
||||
return list;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,68 @@
|
|||
package com.muyu.event.process.listener;
|
||||
|
||||
import com.alibaba.fastjson2.JSONObject;
|
||||
import com.muyu.event.process.basic.BasicEvent;
|
||||
import com.muyu.event.process.basic.BasicEventListener;
|
||||
import com.muyu.event.process.event.IoTDBInsertDataEvent;
|
||||
import com.muyu.event.process.iotdb.service.IoTDBService;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
/**
|
||||
* @Author: zi run
|
||||
* @Date 2024/9/29 22:12
|
||||
* @Description 向IoTDB插入数据事件监听器
|
||||
*/
|
||||
@Component
|
||||
@RequiredArgsConstructor
|
||||
public class IoTDBInsertDataListener implements BasicEventListener<String> {
|
||||
|
||||
/**
|
||||
* IoTDB业务层
|
||||
*/
|
||||
private final IoTDBService ioTDBService;
|
||||
|
||||
/**
|
||||
* 设备名(表名)
|
||||
*/
|
||||
private static final String DEVICE_ID = "root.vehicle";
|
||||
|
||||
/**
|
||||
* 处理接收到的事件,将数据插入到 IoTDB
|
||||
*
|
||||
* @param event 接收到的事件,包含需要插入的数据
|
||||
*/
|
||||
@Override
|
||||
public void onEvent(BasicEvent<String> event) {
|
||||
JSONObject data = JSONObject.parseObject(event.getData());
|
||||
List<String> keyList = extractKeys(data);
|
||||
List<String> valueList = extractValues(data);
|
||||
ioTDBService.insertStringRecord(DEVICE_ID, System.currentTimeMillis(), keyList, valueList);
|
||||
}
|
||||
|
||||
/**
|
||||
* 从给定的JSONObject中提取所有的键
|
||||
*
|
||||
* @param data 要提取键的JSONObject
|
||||
* @return 键的列表
|
||||
*/
|
||||
private List<String> extractKeys(JSONObject data) {
|
||||
return data.keySet().stream().collect(Collectors.toList());
|
||||
}
|
||||
|
||||
/**
|
||||
* 从给定的 JSONObject 中提取所有的值,并将其转换为字符串
|
||||
*
|
||||
* @param data 要提取值的JSONObject
|
||||
* @return 值的列表,以字符串形式表示
|
||||
*/
|
||||
private List<String> extractValues(JSONObject data) {
|
||||
return data.values().stream()
|
||||
.map(Object::toString)
|
||||
.collect(Collectors.toList());
|
||||
}
|
||||
}
|
|
@ -42,4 +42,6 @@ spring:
|
|||
# 系统共享配置
|
||||
- application-${spring.profiles.active}.${spring.cloud.nacos.config.file-extension}
|
||||
# 系统环境Config共享配置
|
||||
- application-config-${spring.profiles.active}.${spring.cloud.nacos.config.file-extension}
|
||||
- application-config-${spring.profiles.active}.${spring.cloud.nacos.config.file-extension}
|
||||
# kafka共享配置
|
||||
- application-kafka-config-${spring.profiles.active}.${spring.cloud.nacos.config.file-extension}
|
Loading…
Reference in New Issue