feat:intodb

master
hbr 2024-05-27 16:22:20 +08:00
parent 2f6bf2f31d
commit 35e32767db
17 changed files with 1102 additions and 0 deletions

View File

@ -216,6 +216,7 @@
<module>zhiLian-modules</module>
<module>zhiLian-common</module>
<module>zhiLian-business</module>
<module>zhiLian-iotdb</module>
</modules>
<packaging>pom</packaging>

View File

@ -0,0 +1,75 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>com.zhiLian</groupId>
<artifactId>zhiLian-server</artifactId>
<version>3.6.3</version>
</parent>
<artifactId>zhiLian-iotdb</artifactId>
<properties>
<maven.compiler.source>17</maven.compiler.source>
<maven.compiler.target>17</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.iotdb</groupId>
<artifactId>iotdb-session</artifactId>
<version>0.14.0-preview1</version>
</dependency>
<dependency>
<groupId>cn.hutool</groupId>
<artifactId>hutool-all</artifactId>
<version>5.6.3</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.83</version>
</dependency>
<dependency>
<groupId>com.bwie</groupId>
<artifactId>bwie-common</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!-- kafka -->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.junit.vintage</groupId>
<artifactId>junit-vintage-engine</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,19 @@
package com.zhiLian.iotdb;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
/**
* BingRui.Hou
*
* @Description
* @ClassName MuyuIotdbApplication
* @Date 2024/05/21 17:01
*/
@SpringBootApplication
public class MuyuIotdbApplication {
public static void main(String[] args) {
SpringApplication.run(MuyuIotdbApplication.class);
}
}

View File

@ -0,0 +1,187 @@
package com.zhiLian.iotdb.config;
import lombok.extern.log4j.Log4j2;
import org.apache.iotdb.rpc.IoTDBConnectionException;
import org.apache.iotdb.rpc.StatementExecutionException;
import org.apache.iotdb.session.Session;
import org.apache.iotdb.session.SessionDataSet;
import org.apache.iotdb.session.util.Version;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.write.record.Tablet;
import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.stereotype.Component;
import java.rmi.ServerException;
import java.util.ArrayList;
import java.util.List;
/**
* description: iotdb
* root.a1eaKSRpRty.CA3013A303A25467
* root.a1eaKSRpRty.CA3013A303A25467.heart root.a1eaKSRpRty
* author: zhouhong
*/
@Log4j2
@Component
@Configuration
public class IotDBSessionConfig {
private static Session session;
private static final String LOCAL_HOST = "111.229.102.61";
@Bean
public Session getSession() throws IoTDBConnectionException, StatementExecutionException {
if (session == null) {
log.info("正在连接iotdb.......");
session = new Session.Builder().host(LOCAL_HOST).port(6667).username("root").password("root").version(Version.V_0_13).build();
session.open(false);
session.setFetchSize(100);
log.info("iotdb连接成功~");
// 设置时区
session.setTimeZone("+08:00");
}
return session;
}
/**
* description: - insertRecord
* author: zhouhong
* @param * @param deviceId:root.a1eaKSRpRty.CA3013A303A25467
* time:
* measurementsList
* type BOOLEAN((byte)0), INT32((byte)1),INT64((byte)2),FLOAT((byte)3),DOUBLE((byte)4),TEXT((byte)5),VECTOR((byte)6);
* valuesList ---
* @return
*/
public void insertRecordType(String deviceId, Long time,List<String> measurementsList, TSDataType type,List<Object> valuesList) throws StatementExecutionException, IoTDBConnectionException, ServerException {
if (measurementsList.size() != valuesList.size()) {
throw new ServerException("measurementsList 与 valuesList 值不对应");
}
List<TSDataType> types = new ArrayList<>();
measurementsList.forEach(item -> {
types.add(type);
});
session.insertRecord(deviceId, time, measurementsList, types, valuesList);
}
/**
* description: - insertRecord
* author: zhouhong
* @param deviceId:root.a1eaKSRpRty.CA3013A303A25467
* @param time:
* @param measurementsList
* @param valuesList ---
* @return
*/
public void insertRecord(String deviceId, Long time,List<String> measurementsList, List<String> valuesList) throws StatementExecutionException, IoTDBConnectionException, ServerException {
if (measurementsList.size() == valuesList.size()) {
session.insertRecord(deviceId, time, measurementsList, valuesList);
} else {
log.error("measurementsList 与 valuesList 值不对应");
}
}
/**
* description:
* author: zhouhong
*/
public void insertRecords(List<String> deviceIdList, List<Long> timeList, List<List<String>> measurementsList, List<List<String>> valuesList) throws StatementExecutionException, IoTDBConnectionException, ServerException {
if (measurementsList.size() == valuesList.size()) {
session.insertRecords(deviceIdList, timeList, measurementsList, valuesList);
} else {
log.error("measurementsList 与 valuesList 值不对应");
}
}
/**
* description:
* author: zhouhong
* @param deviceId:root.a1eaKSRpRty.CA3013A303A25467
* @param time:
* @param schemaList: + List<MeasurementSchema> schemaList = new ArrayList<>(); schemaList.add(new MeasurementSchema("breath", TSDataType.INT64));
* @param maxRowNumber
* @return
*/
public void insertTablet(String deviceId, Long time,List<MeasurementSchema> schemaList, List<Object> valueList,int maxRowNumber) throws StatementExecutionException, IoTDBConnectionException {
Tablet tablet = new Tablet(deviceId, schemaList, maxRowNumber);
// 向iotdb里面添加数据
int rowIndex = tablet.rowSize++;
tablet.addTimestamp(rowIndex, time);
for (int i = 0; i < valueList.size(); i++) {
tablet.addValue(schemaList.get(i).getMeasurementId(), rowIndex, valueList.get(i));
}
if (tablet.rowSize == tablet.getMaxRowNumber()) {
session.insertTablet(tablet, true);
tablet.reset();
}
if (tablet.rowSize != 0) {
session.insertTablet(tablet);
tablet.reset();
}
}
/**
* description: SQL
* author: zhouhong
*/
public SessionDataSet query(String sql) throws StatementExecutionException, IoTDBConnectionException {
return session.executeQueryStatement(sql);
}
/**
* description: root.a1eaKSRpRty
* author: zhouhong
* @param groupName
* @return
*/
public void deleteStorageGroup(String groupName) throws StatementExecutionException, IoTDBConnectionException {
session.deleteStorageGroup(groupName);
}
/**
* description: Timeseries root.a1eaKSRpRty.CA3013A303A25467.breath
* author: zhouhong
*/
public void deleteTimeseries(String timeseries) throws StatementExecutionException, IoTDBConnectionException {
session.deleteTimeseries(timeseries);
}
/**
* description: Timeseries
* author: zhouhong
*/
public void deleteTimeserieList(List<String> timeseriesList) throws StatementExecutionException, IoTDBConnectionException {
session.deleteTimeseries(timeseriesList);
}
/**
* description:
* author: zhouhong
*/
public void deleteStorageGroupList(List<String> storageGroupList) throws StatementExecutionException, IoTDBConnectionException {
session.deleteStorageGroups(storageGroupList);
}
/**
* description:
* author: zhouhong
*/
public void deleteDataByPathAndEndTime(String path, Long endTime) throws StatementExecutionException, IoTDBConnectionException {
session.deleteData(path, endTime);
}
/**
* description:
* author: zhouhong
*/
public void deleteDataByPathListAndEndTime(List<String> pathList, Long endTime) throws StatementExecutionException, IoTDBConnectionException {
session.deleteData(pathList, endTime);
}
/**
* description:
* author: zhouhong
*/
public void deleteDataByPathListAndTime(List<String> pathList, Long startTime,Long endTime) throws StatementExecutionException, IoTDBConnectionException {
session.deleteData(pathList, startTime, endTime);
}
}

View File

@ -0,0 +1,108 @@
package com.zhiLian.iotdb.config;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.SpringBootConfiguration;
import org.springframework.context.annotation.Bean;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import org.springframework.kafka.listener.ContainerProperties;
import org.springframework.kafka.support.serializer.JsonDeserializer;
import java.util.HashMap;
import java.util.Map;
/**
* @author
* @date 2022/10/31 18:05
* kafkaymlyml
*/
@SpringBootConfiguration
public class KafkaConsumerConfig {
@Value("${spring.kafka.consumer.bootstrap-servers}")
private String bootstrapServers;
@Value("${spring.kafka.consumer.group-id}")
private String groupId;
@Value("${spring.kafka.consumer.enable-auto-commit}")
private boolean enableAutoCommit;
@Value("${spring.kafka.properties.session.timeout.ms}")
private String sessionTimeout;
@Value("${spring.kafka.properties.max.poll.interval.ms}")
private String maxPollIntervalTime;
@Value("${spring.kafka.consumer.max-poll-records}")
private String maxPollRecords;
@Value("${spring.kafka.consumer.auto-offset-reset}")
private String autoOffsetReset;
@Value("${spring.kafka.listener.concurrency}")
private Integer concurrency;
@Value("${spring.kafka.listener.missing-topics-fatal}")
private boolean missingTopicsFatal;
@Value("${spring.kafka.listener.poll-timeout}")
private long pollTimeout;
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> propsMap = new HashMap<>(16);
propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
//是否自动提交偏移量默认值是true为了避免出现重复数据和数据丢失可以把它设置为false然后手动提交偏移量
propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit);
//自动提交的时间间隔,自动提交开启时生效
propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "2000");
//该属性指定了消费者在读取一个没有偏移量的分区或者偏移量无效的情况下该作何处理:
//earliest当各分区下有已提交的offset时从提交的offset开始消费无提交的offset时从头开始消费分区的记录
//latest当各分区下有已提交的offset时从提交的offset开始消费无提交的offset时消费新产生的该分区下的数据在消费者启动之后生成的记录
//none当各分区都存在已提交的offset时从提交的offset开始消费只要有一个分区不存在已提交的offset则抛出异常
propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
//两次poll之间的最大间隔默认值为5分钟。如果超过这个间隔会触发reBalance
propsMap.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, maxPollIntervalTime);
//这个参数定义了poll方法最多可以拉取多少条消息默认值为500。如果在拉取消息的时候新消息不足500条那有多少返回多少如果超过500条每次只返回500。
//这个默认值在有些场景下太大有些场景很难保证能够在5min内处理完500条消息
//如果消费者无法在5分钟内处理完500条消息的话就会触发reBalance,
//然后这批消息会被分配到另一个消费者中,还是会处理不完,这样这批消息就永远也处理不完。
//要避免出现上述问题提前评估好处理一条消息最长需要多少时间然后覆盖默认的max.poll.records参数
//注需要开启BatchListener批量监听才会生效如果不开启BatchListener则不会出现reBalance情况
propsMap.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords);
//当broker多久没有收到consumer的心跳请求后就触发reBalance默认值是10s
propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimeout);
//序列化建议使用Json这种序列化方式可以无需额外配置传输实体类
propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
return propsMap;
}
@Bean
public ConsumerFactory<Object, Object> consumerFactory() {
// 配置消费者的 Json 反序列化的可信赖包,反序列化实体类需要
try (JsonDeserializer<Object> deserializer = new JsonDeserializer<>()) {
deserializer.trustedPackages("*");
return new DefaultKafkaConsumerFactory<>(consumerConfigs(), new JsonDeserializer<>(), deserializer);
}
}
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Object, Object>> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
//在侦听器容器中运行的线程数,一般设置为 机器数*分区数
factory.setConcurrency(concurrency);
//消费监听接口监听的主题不存在时默认会报错所以设置为false忽略错误
factory.setMissingTopicsFatal(missingTopicsFatal);
// 自动提交关闭,需要设置手动消息确认
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
factory.getContainerProperties().setPollTimeout(pollTimeout);
// 设置为批量监听需要用List接收
// factory.setBatchListener(true);
return factory;
}
}

View File

@ -0,0 +1,84 @@
package com.zhiLian.iotdb.config;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.SpringBootConfiguration;
import org.springframework.context.annotation.Bean;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.transaction.KafkaTransactionManager;
import org.springframework.kafka.support.serializer.JsonSerializer;
import java.util.HashMap;
import java.util.Map;
/**
* KafkaTemplate
*/
@SpringBootConfiguration
public class KafkaProviderConfig {
@Value("${spring.kafka.producer.bootstrap-servers}")
private String bootstrapServers;
@Value("${spring.kafka.producer.transaction-id-prefix}")
private String transactionIdPrefix;
@Value("${spring.kafka.producer.acks}")
private String acks;
@Value("${spring.kafka.producer.retries}")
private String retries;
@Value("${spring.kafka.producer.batch-size}")
private String batchSize;
@Value("${spring.kafka.producer.buffer-memory}")
private String bufferMemory;
@Bean
public Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>(16);
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
//acks=0 生产者在成功写入消息之前不会等待任何来自服务器的响应。
//acks=1 只要集群的首领节点收到消息,生产者就会收到一个来自服务器成功响应。
//acks=all :只有当所有参与复制的节点全部收到消息时,生产者才会收到一个来自服务器的成功响应。
//开启事务必须设为all
props.put(ProducerConfig.ACKS_CONFIG, acks);
//发生错误后消息重发的次数开启事务必须大于0
props.put(ProducerConfig.RETRIES_CONFIG, retries);
//当多个消息发送到相同分区时,生产者会将消息打包到一起,以减少请求交互. 而不是一条条发送
//批次的大小可以通过batch.size 参数设置.默认是16KB
//较小的批次大小有可能降低吞吐量批次大小为0则完全禁用批处理
//比如说kafka里的消息5秒钟Batch才凑满了16KB才能发送出去。那这些消息的延迟就是5秒钟
//实测batchSize这个参数没有用
props.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSize);
//有的时刻消息比较少,过了很久,比如5min也没有凑够16KB,这样延时就很大,所以需要一个参数. 再设置一个时间,到了这个时间,
//即使数据没达到16KB,也将这个批次发送出去
props.put(ProducerConfig.LINGER_MS_CONFIG, "5000");
//生产者内存缓冲区的大小
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferMemory);
//反序列化,和生产者的序列化方式对应
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
return props;
}
@Bean
public ProducerFactory<Object, Object> producerFactory() {
DefaultKafkaProducerFactory<Object, Object> factory = new DefaultKafkaProducerFactory<>(producerConfigs());
//开启事务,会导致 LINGER_MS_CONFIG 配置失效
factory.setTransactionIdPrefix(transactionIdPrefix);
return factory;
}
@Bean
public KafkaTransactionManager<Object, Object> kafkaTransactionManager(ProducerFactory<Object, Object> producerFactory) {
return new KafkaTransactionManager<>(producerFactory);
}
/**
* kafkaTemplate
* @return
*/
@Bean
public KafkaTemplate<Object, Object> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}

View File

@ -0,0 +1,38 @@
package com.zhiLian.iotdb.config;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.ProducerListener;
import org.springframework.lang.Nullable;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
/**
* @author
* @date 2022/10/31 15:41
* kafka
*/
@Component
public class KafkaSendResultHandler implements ProducerListener<Object, Object> {
@Autowired
private KafkaTemplate<Object, Object> kafkaTemplate;
@PostConstruct
public void init() {
this.kafkaTemplate.setProducerListener(this);
}
@Override
public void onSuccess(ProducerRecord producerRecord, RecordMetadata recordMetadata) {
System.out.println("消息发送成功:" + producerRecord.toString());
}
@Override
public void onError(ProducerRecord producerRecord, @Nullable RecordMetadata recordMetadata, Exception exception) {
System.out.println("消息发送失败:" + producerRecord.toString() + exception.getMessage());
}
}

View File

@ -0,0 +1,34 @@
package com.zhiLian.iotdb.config;
import org.apache.kafka.clients.consumer.Consumer;
import org.springframework.kafka.listener.KafkaListenerErrorHandler;
import org.springframework.kafka.listener.ListenerExecutionFailedException;
import org.springframework.lang.NonNull;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component;
/**
* @author
* @date 2022/10/31 15:27
*
*/
@Component
public class MyKafkaListenerErrorHandler implements KafkaListenerErrorHandler {
@Override
@NonNull
public Object handleError(@NonNull Message<?> message, @NonNull ListenerExecutionFailedException exception) {
return new Object();
}
@Override
@NonNull
public Object handleError(@NonNull Message<?> message, @NonNull ListenerExecutionFailedException exception,
Consumer<?, ?> consumer) {
System.out.println("消息详情:" + message);
System.out.println("异常信息::" + exception);
System.out.println("消费者详情::" + consumer.groupMetadata());
System.out.println("监听主题::" + consumer.listTopics());
return KafkaListenerErrorHandler.super.handleError(message, exception, consumer);
}
}

View File

@ -0,0 +1,71 @@
package com.zhiLian.iotdb.consumer;
import lombok.extern.log4j.Log4j2;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Component;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
/**
* @ClassName:
* @Description:
* @Author: zhuwenqiang
* @Date: 2023/11/2
*/
@Component
@Log4j2
public class HelloWorldKafkaConsumer {
@KafkaListener(topics = { "biwe-topic" })
public void helloWorldKafkaConsumer(ConsumerRecord<String, String> record1) {
// String key = record1.key();
// String value = record1.value();
// 创建 properties 对象 配置 kafka消费者的配置信息
Properties properties = new Properties();
properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "122.51.111.225:9092");
// 设置 键值的反序列化方式
properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
// 配置 分组 ***
properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "bw2");
// 创建 kafka 消息消费者对象
KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>(properties);
// 订阅主题
kafkaConsumer.subscribe(Collections.singleton("bwie-topic"));
while (true) {
// 拉取消息
ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofMillis(2000));
// 遍历
records.forEach(record -> {
String key = record.key();
String value = record.value();
System.out.println("消息者消息成功消息的key" + key + "value" + value);
});
}
// log.info("消息的消费者接收到消息消息的key{}value{}", key, value);
}
@KafkaListener(topics = {"topic1", "topic2"},
containerFactory = "kafkaListenerContainerFactory",
errorHandler = "myKafkaListenerErrorHandler")
public void helloWorldKafkaConsumer(ConsumerRecord<String, String> record,
Acknowledgment acknowledgment) {
String key = record.key();
String value = record.value();
log.info("消息的消费者接收到消息消息的key{}value{}", key, value);
// 手动确认
acknowledgment.acknowledge();
}
}

View File

@ -0,0 +1,15 @@
package com.zhiLian.iotdb.consumer;
/**
* @ClassName:
* @Description:
* @Author: zhuwenqiang
* @Date: 2023/11/2
*/
public class KafkaConsumerQuickStart {
public static void main(String[] args) {
}
}

View File

@ -0,0 +1,114 @@
package com.zhiLian.iotdb.controller;
/**
* BingRui.Hou
*
* @Description
* @ClassName IotDbController
* @Date 2024/05/21 16:55
*/
import com.bwie.common.result.Result;
import com.zhiLian.iotdb.config.IotDBSessionConfig;
import com.zhiLian.iotdb.domain.IotDbParam;
import com.zhiLian.iotdb.service.IotDbServer;
import lombok.extern.log4j.Log4j2;
import org.apache.iotdb.rpc.IoTDBConnectionException;
import org.apache.iotdb.rpc.StatementExecutionException;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.web.bind.annotation.*;
import javax.annotation.Resource;
import java.rmi.ServerException;
import java.util.Properties;
/**
* description: iotdb
* date: 2022/8/15 21:50
* author: zhouhong
*/
@Log4j2
@RestController
public class IotDbController {
@Resource
private IotDbServer iotDbServer;
@Resource
private IotDBSessionConfig iotDBSessionConfig;
/**
*
* @param iotDbParam
*/
@PostMapping("/api/device/insert")
public Result insert(@RequestBody IotDbParam iotDbParam) throws StatementExecutionException, ServerException, IoTDBConnectionException {
iotDbServer.insertData(iotDbParam);
return Result.success();
}
@GetMapping("getMsg")
public void getMsg(@RequestParam("mgs") String msg){
//TODO 发送消息 Kafka
//TODO 用来配置 kafka消息生产者对象的配置信息
Properties properties = new Properties();
//TODO 配置 host
properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "122.51.111.225:9092");
//TODO 配置 键值的序列化方式
properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
//TODO 配置消息的确认
properties.setProperty(ProducerConfig.ACKS_CONFIG, "all");
//TODO 设置重试次数
properties.setProperty(ProducerConfig.RETRIES_CONFIG, "3");
//TODO 消息压缩
properties.setProperty(ProducerConfig.COMPRESSION_TYPE_CONFIG, "gzip");
//TODO 创建消息生产者对象
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);
//TODO 发送消息
//TODO 创建 消息记录
ProducerRecord<String, String> producerRecord = new ProducerRecord<String, String>("bwie-topic", "key", msg);
// TODO kafkaProducer.send(producerRecord);
//TODO 异步发送
kafkaProducer.send(producerRecord, (metadata, exception) -> {
if (exception == null) {
//TODO 获取消息发送的分区
int partition = metadata.partition();
// TODO 消息的偏移量
long offset = metadata.offset();
//TODO 主题
String topic = metadata.topic();
System.out.println("发送成功,消息的分区:" + partition + ",消息的偏移量:" + offset + ",主题:" + topic);
} else {
System.out.println("发送失败,异常信息:" + exception);
}
});
//TODO 关闭 kafkaProducer
kafkaProducer.close();
}
/**
*
* @param iotDbParam
*/
@PostMapping("/api/device/queryData")
public Result queryDataFromIotDb(@RequestBody IotDbParam iotDbParam) throws Exception {
return Result.success(iotDbServer.queryDataFromIotDb(iotDbParam));
}
/**
*
* @return
*/
@PostMapping("/api/device/deleteGroup")
public Result deleteGroup() throws StatementExecutionException, IoTDBConnectionException {
iotDBSessionConfig.deleteStorageGroup("root.a1eaKSRpRty");
iotDBSessionConfig.deleteStorageGroup("root.smartretirement");
return Result.success();
}
}

View File

@ -0,0 +1,43 @@
package com.zhiLian.iotdb.domain;
import lombok.Data;
/**
* BingRui.Hou
*
* @Description
* @ClassName IotDbParam
* @Date 2024/05/21 16:06
*/
@Data
public class IotDbParam {
/**
* PK
*/
private String pk;
/**
*
*/
private String sn;
/**
*
*/
private Long time;
/**
*
*/
private String breath;
/**
*
*/
private String heart;
/**
*
*/
private String startTime;
/**
*
*/
private String endTime;
}

View File

@ -0,0 +1,36 @@
package com.zhiLian.iotdb.domain;
import lombok.Data;
/**
* BingRui.Hou
*
* @Description
* @ClassName IotDbResult
* @Date 2024/05/21 16:10
*/
@Data
public class IotDbResult {
/**
*
*/
private String time;
/**
* PK
*/
private String pk;
/**
*
*/
private String sn;
/**
*
*/
private String breath;
/**
*
*/
private String heart;
}

View File

@ -0,0 +1,26 @@
package com.zhiLian.iotdb.producer;
/**
* @ClassName:
* @Description:
* @Author: zhuwenqiang
* @Date: 2023/11/2
*/
public class KafkaProducerQuickStart {
/**
*
*
* 1 : 1
*
* 1 : N
*
*
* @param args
*/
public static void main(String[] args) {
}
}

View File

@ -0,0 +1,17 @@
package com.zhiLian.iotdb.service;
import com.zhiLian.iotdb.domain.IotDbParam;
/**
* BingRui.Hou
*
* @Description
* @ClassName IotDbServer
* @Date 2024/05/21 16:56
*/
public interface IotDbServer {
void insertData(IotDbParam iotDbParam);
Object queryDataFromIotDb(IotDbParam iotDbParam);
}

View File

@ -0,0 +1,132 @@
package com.zhiLian.iotdb.service.impl;
import com.zhiLian.iotdb.config.IotDBSessionConfig;
import com.zhiLian.iotdb.domain.IotDbParam;
import com.zhiLian.iotdb.domain.IotDbResult;
import com.zhiLian.iotdb.service.IotDbServer;
import lombok.extern.log4j.Log4j2;
import org.apache.iotdb.rpc.IoTDBConnectionException;
import org.apache.iotdb.rpc.StatementExecutionException;
import org.apache.iotdb.session.SessionDataSet;
import org.apache.iotdb.tsfile.read.common.Field;
import org.apache.iotdb.tsfile.read.common.RowRecord;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
import java.rmi.ServerException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* BingRui.Hou
*
* @Description
* @ClassName IotDbServerImpl
* @Date 2024/05/21 16:55
*/
@Log4j2
@Service
public class IotDbServerImpl implements IotDbServer {
@Resource
private IotDBSessionConfig iotDBSessionConfig;
@Override
public void insertData(IotDbParam iotDbParam) {
// iotDbParam: 模拟设备上报消息
// bizkey: 业务唯一key PK :产品唯一编码 SN:设备唯一编码
String deviceId = "root.bizkey."+ iotDbParam.getPk() + "." + iotDbParam.getSn();
// 将设备上报的数据存入数据库(时序数据库)
List<String> measurementsList = new ArrayList<>();
measurementsList.add("heart");
measurementsList.add("breath");
List<String> valuesList = new ArrayList<>();
valuesList.add(String.valueOf(iotDbParam.getHeart()));
valuesList.add(String.valueOf(iotDbParam.getBreath()));
try {
iotDBSessionConfig.insertRecord(deviceId, iotDbParam.getTime(), measurementsList, valuesList);
} catch (IoTDBConnectionException e) {
throw new RuntimeException(e);
} catch (StatementExecutionException e) {
throw new RuntimeException(e);
} catch (ServerException e) {
throw new RuntimeException(e);
}
}
@Override
public List<IotDbResult> queryDataFromIotDb(IotDbParam iotDbParam) {
List<IotDbResult> iotDbResultList = new ArrayList<>();
if (null != iotDbParam.getPk() && null != iotDbParam.getSn()) {
String sql = "select * from root.bizkey."+ iotDbParam.getPk() +"." + iotDbParam.getSn() + " where time >= "
+ iotDbParam.getStartTime() + " and time < " + iotDbParam.getEndTime();
SessionDataSet sessionDataSet = null;
try {
sessionDataSet = iotDBSessionConfig.query(sql);
} catch (StatementExecutionException e) {
throw new RuntimeException(e);
} catch (IoTDBConnectionException e) {
throw new RuntimeException(e);
}
List<String> columnNames = sessionDataSet.getColumnNames();
List<String> titleList = new ArrayList<>();
// 排除Time字段 -- 方便后面后面拼装数据
for (int i = 1; i < columnNames.size(); i++) {
String[] temp = columnNames.get(i).split("\\.");
titleList.add(temp[temp.length - 1]);
}
// 封装处理数据
try {
packagingData(iotDbParam, iotDbResultList, sessionDataSet, titleList);
} catch (StatementExecutionException e) {
throw new RuntimeException(e);
} catch (IoTDBConnectionException e) {
throw new RuntimeException(e);
}
} else {
log.info("PK或者SN不能为空");
}
return iotDbResultList;
}
/**
*
* @param iotDbParam
* @param iotDbResultList
* @param sessionDataSet
* @param titleList
* @throws StatementExecutionException
* @throws IoTDBConnectionException
*/
private void packagingData(IotDbParam iotDbParam, List<IotDbResult> iotDbResultList, SessionDataSet sessionDataSet, List<String> titleList)
throws StatementExecutionException, IoTDBConnectionException {
int fetchSize = sessionDataSet.getFetchSize();
if (fetchSize > 0) {
while (sessionDataSet.hasNext()) {
IotDbResult iotDbResult = new IotDbResult();
RowRecord next = sessionDataSet.next();
List<Field> fields = next.getFields();
String timeString = new java.text.SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(next.getTimestamp());
iotDbResult.setTime(timeString);
Map<String, String> map = new HashMap<>();
for (int i = 0; i < fields.size(); i++) {
Field field = fields.get(i);
// 这里的需要按照类型获取
map.put(titleList.get(i), field.getObjectValue(field.getDataType()).toString());
}
iotDbResult.setTime(timeString);
iotDbResult.setPk(iotDbParam.getPk());
iotDbResult.setSn(iotDbParam.getSn());
iotDbResult.setHeart(map.get("heart"));
iotDbResult.setBreath(map.get("breath"));
iotDbResultList.add(iotDbResult);
}
}
}
}

View File

@ -0,0 +1,102 @@
# Tomcat
server:
port: 9009
# Spring
spring:
kafka:
producer:
# Kafka服务器
bootstrap-servers: 122.51.111.225:9092
# 开启事务,必须在开启了事务的方法中发送,否则报错
transaction-id-prefix: kafkaTx-
# 发生错误后消息重发的次数开启事务必须设置大于0。
retries: 3
# acks=0 生产者在成功写入消息之前不会等待任何来自服务器的响应。
# acks=1 只要集群的首领节点收到消息,生产者就会收到一个来自服务器成功响应。
# acks=all :只有当所有参与复制的节点全部收到消息时,生产者才会收到一个来自服务器的成功响应。
# 开启事务时必须设置为all
acks: all
# 当有多个消息需要被发送到同一个分区时,生产者会把它们放在同一个批次里。该参数指定了一个批次可以使用的内存大小,按照字节数计算。
batch-size: 16384
# 生产者内存缓冲区的大小。
buffer-memory: 1024000
# 键的序列化方式
key-serializer: org.springframework.kafka.support.serializer.JsonSerializer
# 值的序列化方式建议使用Json这种序列化方式可以无需额外配置传输实体类
value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
consumer:
# Kafka服务器
bootstrap-servers: 122.51.111.225:9092
group-id: firstGroup
# 自动提交的时间间隔 在spring boot 2.X 版本中这里采用的是值的类型为Duration 需要符合特定的格式如1S,1M,2H,5D
#auto-commit-interval: 2s
# 该属性指定了消费者在读取一个没有偏移量的分区或者偏移量无效的情况下该作何处理:
# earliest当各分区下有已提交的offset时从提交的offset开始消费无提交的offset时从头开始消费分区的记录
# latest当各分区下有已提交的offset时从提交的offset开始消费无提交的offset时消费新产生的该分区下的数据在消费者启动之后生成的记录
# none当各分区都存在已提交的offset时从提交的offset开始消费只要有一个分区不存在已提交的offset则抛出异常
auto-offset-reset: latest
# 是否自动提交偏移量默认值是true为了避免出现重复数据和数据丢失可以把它设置为false然后手动提交偏移量
enable-auto-commit: false
# 键的反序列化方式
#key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
key-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
# 值的反序列化方式建议使用Json这种序列化方式可以无需额外配置传输实体类
value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
# 配置消费者的 Json 反序列化的可信赖包,反序列化实体类需要
properties:
spring:
json:
trusted:
packages: "*"
# 这个参数定义了poll方法最多可以拉取多少条消息默认值为500。如果在拉取消息的时候新消息不足500条那有多少返回多少如果超过500条每次只返回500。
# 这个默认值在有些场景下太大有些场景很难保证能够在5min内处理完500条消息
# 如果消费者无法在5分钟内处理完500条消息的话就会触发reBalance,
# 然后这批消息会被分配到另一个消费者中,还是会处理不完,这样这批消息就永远也处理不完。
# 要避免出现上述问题提前评估好处理一条消息最长需要多少时间然后覆盖默认的max.poll.records参数
# 注需要开启BatchListener批量监听才会生效如果不开启BatchListener则不会出现reBalance情况
max-poll-records: 3
properties:
# 两次poll之间的最大间隔默认值为5分钟。如果超过这个间隔会触发reBalance
max:
poll:
interval:
ms: 600000
# 当broker多久没有收到consumer的心跳请求后就触发reBalance默认值是10s
session:
timeout:
ms: 10000
listener:
# 在侦听器容器中运行的线程数,一般设置为 机器数*分区数
concurrency: 4
# 自动提交关闭,需要设置手动消息确认
ack-mode: manual_immediate
# 消费监听接口监听的主题不存在时默认会报错所以设置为false忽略错误
missing-topics-fatal: false
# 两次poll之间的最大间隔默认值为5分钟。如果超过这个间隔会触发reBalance
poll-timeout: 600000
main:
allow-circular-references: true # 允许循环依赖
jackson: # json 序列化 和 返序列化 转换
date-format: yyyy-MM-dd HH:mm:ss
time-zone: GMT+8
application:
# 应用名称
name: bwie-kafka
profiles:
# 环境配置
active: dev
cloud:
nacos:
discovery:
# 服务注册地址
server-addr: 122.51.111.225:8848
config:
# 配置中心地址
server-addr: 122.51.111.225:8848
# namespace: a9b66e92-e507-47ba-9674-6f939f793aca
# 配置文件格式
file-extension: yml
# 共享配置
shared-configs:
- application-${spring.profiles.active}.${spring.cloud.nacos.config.file-extension}