From 47567ff0554c100ff0de0e77e768609fa39b909b Mon Sep 17 00:00:00 2001 From: xinzirun Date: Sun, 29 Sep 2024 00:55:53 +0800 Subject: [PATCH] =?UTF-8?q?feat():=20=E6=96=B0=E5=A2=9EIoTDB=E9=85=8D?= =?UTF-8?q?=E7=BD=AE=E7=B1=BB=E3=80=81=E5=9F=BA=E7=A1=80=E4=B8=9A=E5=8A=A1?= =?UTF-8?q?=E7=B1=BB?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- cloud-auth/src/main/resources/bootstrap.yml | 2 +- .../src/main/resources/bootstrap.yml | 2 +- .../cloud-modules-data-process/pom.xml | 91 +++ .../process/CloudDataProcessApplication.java | 20 + .../basic/config/IoTDBSessionConfig.java | 72 ++ .../process/basic/service/IoTDBService.java | 110 +++ .../basic/service/impl/IoTDBServiceImpl.java | 714 ++++++++++++++++++ .../muyu/data/process/domain/DataJSON.java | 33 + .../data/process/domain/InsertDataDTO.java | 67 ++ .../data/process/domain/ResultEntity.java | 36 + .../data/process/domain/TestDataType.java | 43 ++ .../process/domain/dto/IoTDbRecordAble.java | 12 + .../dto/MeasurementSchemaValuesDTO.java | 36 + .../src/main/resources/banner.txt | 2 + .../src/main/resources/bootstrap.yml | 45 ++ .../src/main/resources/logback/dev.xml | 74 ++ .../src/main/resources/logback/prod.xml | 81 ++ .../src/main/resources/logback/test.xml | 81 ++ .../cloud-modules-enterprise-server/pom.xml | 1 - .../CloudEnterpriseApplication.java | 2 +- .../src/main/resources/bootstrap.yml | 2 +- .../src/main/resources/bootstrap.yml | 2 +- .../src/main/resources/bootstrap.yml | 2 +- .../src/main/resources/bootstrap.yml | 2 +- cloud-modules/pom.xml | 1 + .../src/main/resources/bootstrap.yml | 2 +- pom.xml | 8 + 27 files changed, 1534 insertions(+), 9 deletions(-) create mode 100644 cloud-modules/cloud-modules-data-process/pom.xml create mode 100644 cloud-modules/cloud-modules-data-process/src/main/java/com/muyu/data/process/CloudDataProcessApplication.java create mode 100644 cloud-modules/cloud-modules-data-process/src/main/java/com/muyu/data/process/basic/config/IoTDBSessionConfig.java create mode 100644 cloud-modules/cloud-modules-data-process/src/main/java/com/muyu/data/process/basic/service/IoTDBService.java create mode 100644 cloud-modules/cloud-modules-data-process/src/main/java/com/muyu/data/process/basic/service/impl/IoTDBServiceImpl.java create mode 100644 cloud-modules/cloud-modules-data-process/src/main/java/com/muyu/data/process/domain/DataJSON.java create mode 100644 cloud-modules/cloud-modules-data-process/src/main/java/com/muyu/data/process/domain/InsertDataDTO.java create mode 100644 cloud-modules/cloud-modules-data-process/src/main/java/com/muyu/data/process/domain/ResultEntity.java create mode 100644 cloud-modules/cloud-modules-data-process/src/main/java/com/muyu/data/process/domain/TestDataType.java create mode 100644 cloud-modules/cloud-modules-data-process/src/main/java/com/muyu/data/process/domain/dto/IoTDbRecordAble.java create mode 100644 cloud-modules/cloud-modules-data-process/src/main/java/com/muyu/data/process/domain/dto/MeasurementSchemaValuesDTO.java create mode 100644 cloud-modules/cloud-modules-data-process/src/main/resources/banner.txt create mode 100644 cloud-modules/cloud-modules-data-process/src/main/resources/bootstrap.yml create mode 100644 cloud-modules/cloud-modules-data-process/src/main/resources/logback/dev.xml create mode 100644 cloud-modules/cloud-modules-data-process/src/main/resources/logback/prod.xml create mode 100644 cloud-modules/cloud-modules-data-process/src/main/resources/logback/test.xml diff --git a/cloud-auth/src/main/resources/bootstrap.yml b/cloud-auth/src/main/resources/bootstrap.yml index 2bdda14..270c8d6 100644 --- a/cloud-auth/src/main/resources/bootstrap.yml +++ b/cloud-auth/src/main/resources/bootstrap.yml @@ -6,7 +6,7 @@ nacos: addr: 106.15.136.7:8848 user-name: nacos password: nacos - namespace: dev + namespace: xzr # Spring spring: application: diff --git a/cloud-gateway/src/main/resources/bootstrap.yml b/cloud-gateway/src/main/resources/bootstrap.yml index 4132cf0..5e54e71 100644 --- a/cloud-gateway/src/main/resources/bootstrap.yml +++ b/cloud-gateway/src/main/resources/bootstrap.yml @@ -6,7 +6,7 @@ nacos: addr: 106.15.136.7:8848 user-name: nacos password: nacos - namespace: dev + namespace: xzr # Spring spring: diff --git a/cloud-modules/cloud-modules-data-process/pom.xml b/cloud-modules/cloud-modules-data-process/pom.xml new file mode 100644 index 0000000..cb4a354 --- /dev/null +++ b/cloud-modules/cloud-modules-data-process/pom.xml @@ -0,0 +1,91 @@ + + + 4.0.0 + + com.muyu + cloud-modules + 3.6.3 + + + cloud-modules-data-process + + + 17 + 17 + UTF-8 + + + + cloud-modules-data-process 数据处理 + + + + + + com.alibaba.cloud + spring-cloud-starter-alibaba-nacos-discovery + + + + + com.alibaba.cloud + spring-cloud-starter-alibaba-nacos-config + + + + + com.alibaba.cloud + spring-cloud-starter-alibaba-sentinel + + + + + org.springframework.boot + spring-boot-starter-actuator + + + + + com.mysql + mysql-connector-j + + + + + com.muyu + cloud-common-datasource + + + + + com.muyu + cloud-common-datascope + + + + + com.muyu + cloud-common-log + + + + + com.muyu + cloud-common-api-doc + + + + + com.muyu + cloud-common-core + + + + + org.apache.iotdb + iotdb-session + + + \ No newline at end of file diff --git a/cloud-modules/cloud-modules-data-process/src/main/java/com/muyu/data/process/CloudDataProcessApplication.java b/cloud-modules/cloud-modules-data-process/src/main/java/com/muyu/data/process/CloudDataProcessApplication.java new file mode 100644 index 0000000..b5ba238 --- /dev/null +++ b/cloud-modules/cloud-modules-data-process/src/main/java/com/muyu/data/process/CloudDataProcessApplication.java @@ -0,0 +1,20 @@ +package com.muyu.data.process; + +import com.muyu.common.security.annotation.EnableCustomConfig; +import com.muyu.common.security.annotation.EnableMyFeignClients; +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; + +/** + * @Author: zi run + * @Date 2024/9/28 22:31 + * @Description 数据处理微服启动类 + */ +@EnableCustomConfig +@EnableMyFeignClients +@SpringBootApplication +public class CloudDataProcessApplication { + public static void main(String[] args) { + SpringApplication.run(CloudDataProcessApplication.class, args); + } +} diff --git a/cloud-modules/cloud-modules-data-process/src/main/java/com/muyu/data/process/basic/config/IoTDBSessionConfig.java b/cloud-modules/cloud-modules-data-process/src/main/java/com/muyu/data/process/basic/config/IoTDBSessionConfig.java new file mode 100644 index 0000000..3bf7de1 --- /dev/null +++ b/cloud-modules/cloud-modules-data-process/src/main/java/com/muyu/data/process/basic/config/IoTDBSessionConfig.java @@ -0,0 +1,72 @@ +package com.muyu.data.process.basic.config; + +import lombok.extern.slf4j.Slf4j; +import org.apache.iotdb.session.pool.SessionPool; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Configuration; +import org.springframework.stereotype.Component; +import java.util.List; + +/** + * @Author: zi run + * @Date 2024/9/28 22:41 + * @Description IoTDB会话配置 + */ +@Slf4j +@Component +@Configuration +public class IoTDBSessionConfig { + + @Value("${spring.iotdb.username}") + private String username; + + @Value("${spring.iotdb.password}") + private String password; + + @Value("${spring.iotdb.ip}") + private String ip; + + @Value("${spring.iotdb.port}") + private int port; + + @Value("${spring.iotdb.maxSize}") + private int maxSize; + + /** + * IoTDB会话池 + */ + private static SessionPool sessionPool = null; + + /** + * 获取IoTDB会话对象 + * @return ioTDB会话对象 + */ + public SessionPool getSessionPool() { + if (sessionPool == null) { + sessionPool = new SessionPool(ip, port, username, password, maxSize); + } + return sessionPool; + } + + /** + * 向IoTDB中插入特定设备的记录 + * + * @param deviceId 设备的唯一标识符 + * @param time 记录的时间戳,以毫秒为单位 + * @param measurements 与记录关联的测量名称列表 + * @param values 每个测量对应的值列表。值的顺序必须与测量名称一一对应 + * + *

该方法从会话池中获取一个会话,并尝试将指定的记录插入到 IoTDB 中。 + * 如果插入失败,将记录错误信息,便于后续排查。

+ */ + public void insertRecord(String deviceId, long time, List measurements, List values) { + getSessionPool(); + try { + log.info("iotdb数据入库:device_id:[{}], measurements:[{}], values:[{}]", deviceId, measurements, values); + sessionPool.insertRecord(deviceId, time, measurements, values); + } catch (Exception e) { + log.error("IotDBSession insertRecord失败: deviceId={}, time={}, measurements={}, values={}, error={}", + deviceId, time, measurements, values, e.getMessage()); + } + } +} diff --git a/cloud-modules/cloud-modules-data-process/src/main/java/com/muyu/data/process/basic/service/IoTDBService.java b/cloud-modules/cloud-modules-data-process/src/main/java/com/muyu/data/process/basic/service/IoTDBService.java new file mode 100644 index 0000000..0a99fd6 --- /dev/null +++ b/cloud-modules/cloud-modules-data-process/src/main/java/com/muyu/data/process/basic/service/IoTDBService.java @@ -0,0 +1,110 @@ +package com.muyu.data.process.basic.service; + +import com.muyu.data.process.domain.dto.IoTDbRecordAble; +import com.muyu.data.process.domain.dto.MeasurementSchemaValuesDTO; +import org.apache.iotdb.common.rpc.thrift.TAggregationType; +import org.apache.iotdb.isession.SessionDataSet; +import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; +import org.apache.iotdb.tsfile.write.record.Tablet; +import org.apache.iotdb.tsfile.write.schema.MeasurementSchema; + +import java.util.List; +import java.util.Map; + +/** + * @Author: zi run + * @Date 2024/9/28 23:37 + * @Description IoTDB业务层 + */ +public interface IoTDBService { + + /** + * 插入一个 Tablet 对象到 IoTDB 数据库 + * + * @param tablet 要插入的 Tablet 对象,包含待写入的数据 + */ + void insertTablet(Tablet tablet); + + void insertTablets(Map tablets); + + void insertStringRecord(String deviceId, long time, List measurements, List values); + + void insertRecord(String deviceId, long time, List measurements, List types, List values); + + void insertStringRecords(List deviceIds, List times, List> measurementsList, List> valuesList); + + void insertRecords(List deviceIds, List times, List> measurementsList, List> typesList, List> valuesList); + + void insertStringRecordsOfOneDevice(String deviceId, List times, List> measurementsList, List> valuesList); + + void insertRecordsOfOneDevice(String deviceId, List times, List> measurementsList, List> typesList, List> valuesList); + + void deleteData(String path, long endTime); + + void deleteData(List paths, long endTime); + + SessionDataSet executeRawDataQuery(List paths, long startTime, long endTime, long timeOut); + + List executeRawDataQuery(List paths, long startTime, long endTime, long timeOut, Class clazz); + + SessionDataSet executeLastDataQuery(List paths, long lastTime); + + List executeLastDataQuery(List paths, long lastTime, Class clazz); + + SessionDataSet executeLastDataQueryForOneDevice(String db, String device, List sensors, boolean isLegalPathNodes); + + List executeLastDataQueryForOneDevice(String db, String device, List sensors, boolean isLegalPathNodes, Class clazz); + + SessionDataSet executeAggregationQuery(List paths, List aggregations); + + SessionDataSet executeAggregationQuery(List paths, List aggregations, long startTime, long endTime); + + SessionDataSet executeAggregationQuery(List paths, List aggregations, long startTime, long endTime, long interval); + + SessionDataSet executeAggregationQuery(List paths, List aggregations, long startTime, long endTime, long interval, long slidingStep); + + SessionDataSet executeQueryStatement(String sql); + + + /** + * SQL非查询 + * + * @param sql + */ + void executeNonQueryStatement(String sql); + + /** + * 封装处理数据 + * + * @param sessionDataSet + * @param columnNames + */ + List> packagingMapData(SessionDataSet sessionDataSet, List columnNames); + + /** + * 封装处理数据(不支持聚合查询) + * + * @param sessionDataSet 查询返回的结果集 + * @param titleList 查询返回的结果集内的字段名 + * @param clazz 返回数据对应的对象(对象属性必须与字段名对应) + * @param + * @return + */ + List packagingObjectData(SessionDataSet sessionDataSet, List titleList, Class clazz); + + /** + * 根据对象构建MeasurementSchemas + * + * @param object 对象 + * @return + */ + List buildMeasurementSchemas(Object object); + + /** + * 根据对象构建MeasurementSchemaValuesDTO + * + * @param object 对象 + * @return + */ + MeasurementSchemaValuesDTO buildMeasurementSchemasAndValues(Object object); +} diff --git a/cloud-modules/cloud-modules-data-process/src/main/java/com/muyu/data/process/basic/service/impl/IoTDBServiceImpl.java b/cloud-modules/cloud-modules-data-process/src/main/java/com/muyu/data/process/basic/service/impl/IoTDBServiceImpl.java new file mode 100644 index 0000000..d551a1a --- /dev/null +++ b/cloud-modules/cloud-modules-data-process/src/main/java/com/muyu/data/process/basic/service/impl/IoTDBServiceImpl.java @@ -0,0 +1,714 @@ +package com.muyu.data.process.basic.service.impl; + +import com.alibaba.fastjson.JSON; +import com.muyu.data.process.basic.config.IoTDBSessionConfig; +import com.muyu.data.process.basic.service.IoTDBService; +import com.muyu.data.process.domain.dto.IoTDbRecordAble; +import com.muyu.data.process.domain.dto.MeasurementSchemaValuesDTO; +import lombok.RequiredArgsConstructor; +import lombok.SneakyThrows; +import lombok.extern.slf4j.Slf4j; +import org.apache.iotdb.common.rpc.thrift.TAggregationType; +import org.apache.iotdb.isession.pool.SessionDataSetWrapper; +import org.apache.iotdb.session.pool.SessionPool; +import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; +import org.apache.iotdb.tsfile.read.common.Field; +import org.apache.iotdb.tsfile.read.common.RowRecord; +import org.apache.iotdb.tsfile.write.record.Tablet; +import org.apache.iotdb.isession.SessionDataSet; +import org.apache.iotdb.tsfile.write.schema.MeasurementSchema; +import org.springframework.stereotype.Service; + +import java.lang.reflect.Type; +import java.util.*; +import java.util.stream.Collectors; + +/** + * @Author: zi run + * @Date 2024/9/28 23:38 + * @Description IoTDB业务实现层 + */ +@Slf4j +@Service +@RequiredArgsConstructor +public class IoTDBServiceImpl implements IoTDBService { + + /** + * IoTDB会话配置 + */ + private final IoTDBSessionConfig ioTDBSessionConfig; + + /** + * 插入一个 Tablet 对象到 IoTDB 数据库 + * + * @param tablet 要插入的 Tablet 对象,包含待写入的数据 + */ + @Override + public void insertTablet(Tablet tablet) { + SessionPool sessionPool = ioTDBSessionConfig.getSessionPool(); + try { + log.info("iotdb数据入库:tablet:[{}]", tablet); + sessionPool.insertTablet(tablet); + } catch (Exception e) { + log.error("IotDBSession insertTablet失败: tablet={}, error={}", tablet, e.getMessage()); + } + } + + /** + * 将给定的 Tablets 插入到 IoTDB 数据库中。 + * + * @param tablets 一个 Map,包含要插入的 Tablets,键为 String 类型,值为 Tablet 对象。 + * Tablets 应该已经准备好并符合 IoTDB 的插入要求。 + */ + @Override + public void insertTablets(Map tablets) { + SessionPool sessionPool = ioTDBSessionConfig.getSessionPool(); + try { + log.info("iotdb数据入库:tablets:[{}]", tablets); + sessionPool.insertTablets(tablets); + } catch (Exception e) { + log.error("IotDBSession insertTablets失败: tablets={}, error={}", tablets, e.getMessage()); + } + } + + /** + * 单条数据插入(string类型数据项) + * + * @param deviceId 设备名(表名)root.ln.wf01.wt01 + * @param time 时间戳 + * @param measurements 数据项列表 + * @param values 数据项对应值列表 + */ + @Override + public void insertStringRecord(String deviceId, long time, List measurements, List values) { + SessionPool sessionPool = ioTDBSessionConfig.getSessionPool(); + try { + log.info("iotdb数据入库:device_id:[{}], measurements:[{}], values:[{}]", deviceId, measurements, values); + sessionPool.insertRecord(deviceId, time, measurements, values); + } catch (Exception e) { + log.error("IotDBSession insertRecord失败: deviceId={}, time={}, measurements={}, values={}, error={}", + deviceId, time, measurements, values, e.getMessage()); + } + } + + /** + * 单条数据插入(不同类型数据项) + * + * @param deviceId 设备名(表名)root.ln.wf01.wt01 + * @param time 时间戳 + * @param measurements 数据项列表 + * @param types 数据项对应类型列表 + * @param values 数据项对应值列表 + */ + @Override + public void insertRecord(String deviceId, long time, List measurements, List types, List values) { + SessionPool sessionPool = ioTDBSessionConfig.getSessionPool(); + try { + log.info("iotdb数据入库:device_id:[{}], measurements:[{}], types:[{}], values:[{}]", deviceId, measurements, types, values); + sessionPool.insertRecord(deviceId, time, measurements, types, values); + } catch (Exception e) { + log.error("IotDBSession insertRecordHasTypes失败: deviceId={}, time={}, measurements={},types={}, values={}, error={}", + deviceId, time, measurements, types, values, e.getMessage()); + } + } + + + /** + * 多个设备多条数据插入(string类型数据项) + * + * @param deviceIds 多个设备名(表名)root.ln.wf01.wt01 + * @param times 时间戳的列表 + * @param measurementsList 数据项列表的列表 + * @param valuesList 数据项对应值列表的列表 + */ + @Override + public void insertStringRecords(List deviceIds, List times, List> measurementsList, List> valuesList) { + SessionPool sessionPool = ioTDBSessionConfig.getSessionPool(); + try { + log.info("iotdb数据入库:deviceIds:[{}], measurementsList:[{}], valuesList:[{}]", deviceIds, measurementsList, valuesList); + sessionPool.insertRecords(deviceIds, times, measurementsList, valuesList); + } catch (Exception e) { + log.error("IotDBSession insertRecords失败: deviceIds={}, times={}, measurementsList={}, valuesList={}, error={}", + deviceIds, times, measurementsList, valuesList, e.getMessage()); + } + } + + /** + * 多个设备多条数据插入(不同类型数据项) + * + * @param deviceIds 多个设备名(表名))root.ln.wf01.wt01 + * @param times 时间戳的列表 + * @param measurementsList 数据项列表的列表 + * @param typesList 数据项对应类型列表的列表 + * @param valuesList 数据项对应值列表的列表 + */ + @Override + public void insertRecords(List deviceIds, List times, List> measurementsList, List> typesList, List> valuesList) { + SessionPool sessionPool = ioTDBSessionConfig.getSessionPool(); + try { + log.info("iotdb数据入库:deviceIds:[{}], measurementsList:[{}], typesList:[{}], valuesList:[{}]", deviceIds, measurementsList, typesList, valuesList); + sessionPool.insertRecords(deviceIds, times, measurementsList, typesList, valuesList); + } catch (Exception e) { + log.error("IotDBSession insertRecords失败: deviceIds={}, times={}, measurementsList={}, typesList=[],valuesList={}, error={}", + deviceIds, times, measurementsList, typesList, valuesList, e.getMessage()); + } + } + + /** + * 单个设备多条数据插入(string类型数据项) + * + * @param deviceId 单个设备名(表名))root.ln.wf01.wt01 + * @param times 时间戳的列表 + * @param measurementsList 数据项列表的列表 + * @param valuesList 数据项对应值列表的列表 + */ + @Override + public void insertStringRecordsOfOneDevice(String deviceId, List times, List> measurementsList, List> valuesList) { + SessionPool sessionPool = ioTDBSessionConfig.getSessionPool(); + try { + log.info("iotdb数据入库:deviceId:[{}], measurementsList:[{}], valuesList:[{}]", deviceId, measurementsList, valuesList); + sessionPool.insertStringRecordsOfOneDevice(deviceId, times, measurementsList, valuesList); + } catch (Exception e) { + log.error("IotDBSession insertStringRecordsOfOneDevice失败: deviceId={}, times={}, measurementsList={}, valuesList={}, error={}", + deviceId, times, measurementsList, valuesList, e.getMessage()); + } + } + + /** + * 单个设备多条数据插入(不同类型数据项) + * + * @param deviceId 单个设备名(表名))root.ln.wf01.wt01 + * @param times 时间戳的列表 + * @param measurementsList 数据项列表的列表 + * @param typesList 数据项对应类型列表的列表 + * @param valuesList 数据项对应值列表的列表 + */ + @Override + public void insertRecordsOfOneDevice(String deviceId, List times, List> measurementsList, List> typesList, List> valuesList) { + SessionPool sessionPool = ioTDBSessionConfig.getSessionPool(); + try { + log.info("iotdb数据入库:deviceId:[{}], measurementsList:[{}], typesList:[{}], valuesList:[{}]", deviceId, measurementsList, typesList, valuesList); + sessionPool.insertRecordsOfOneDevice(deviceId, times, measurementsList, typesList, valuesList); + } catch (Exception e) { + log.error("IotDBSession insertRecordsOfOneDevice失败: deviceId={}, times={}, measurementsList={}, typesList=[],valuesList={}, error={}", deviceId, times, measurementsList, typesList, valuesList, e.getMessage()); + } + } + + /** + * 删除数据(删除一个时间序列在某个时间点前或这个时间点的数据) + * + * @param path 单个字段 root.ln.wf01.wt01.temperature + * @param endTime 删除时间点 + */ + @Override + public void deleteData(String path, long endTime) { + SessionPool sessionPool = ioTDBSessionConfig.getSessionPool(); + try { + log.info("iotdb数据删除:path:[{}], endTime:[{}]", path, endTime); + sessionPool.deleteData(path, endTime); + } catch (Exception e) { + log.error("IotDBSession deleteData失败: deviceId={}, times={},error={}", path, endTime, e.getMessage()); + } + } + + /** + * 删除数据(删除多个时间序列在某个时间点前或这个时间点的数据) + * + * @param paths 多个字段(表名)) root.ln.wf01.wt01.temperature + * @param endTime 删除时间点 + */ + @Override + public void deleteData(List paths, long endTime) { + SessionPool sessionPool = ioTDBSessionConfig.getSessionPool(); + try { + log.info("iotdb数据删除:paths:[{}], endTime:[{}]", paths, endTime); + sessionPool.deleteData(paths, endTime); + } catch (Exception e) { + log.error("IotDBSession deleteData失败: paths={}, times={},error={}", paths, endTime, e.getMessage()); + } + } + + /** + * 数据查询(时间序列原始数据范围查询,时间范围为左闭右开区间,包含开始时间但不包含结束时间) + * + * @param paths 多个字段(表名)) root.ln.wf01.wt01.temperature + * @param startTime 开始时间 + * @param endTime 结束时间 + * @param outTime 超时时间 + * @return SessionDataSet (Time,paths) + */ + @Override + public SessionDataSet executeRawDataQuery(List paths, long startTime, long endTime, long outTime) { + SessionPool sessionPool = ioTDBSessionConfig.getSessionPool(); + SessionDataSetWrapper sessionDataSetWrapper = null; + try { + log.info("iotdb数据查询:paths:[{}], startTime:[{}], endTime:[{}],outTime:[{}]", paths, startTime, endTime, outTime); + sessionDataSetWrapper = sessionPool.executeRawDataQuery(paths, startTime, endTime, outTime); + return sessionDataSetWrapper.getSessionDataSet(); + } catch (Exception e) { + log.error("IotDBSession executeRawDataQuery失败: paths={}, startTime:[{}], endTime:[{}],outTime:[{}],error={}", paths, startTime, endTime, outTime, e.getMessage()); + } finally { + sessionPool.closeResultSet(sessionDataSetWrapper); + } + return null; + } + + /** + * 数据查询(时间序列原始数据范围查询,时间范围为左闭右开区间,包含开始时间但不包含结束时间) + * + * @param paths 多个字段(表名)) root.ln.wf01.wt01.temperature + * @param startTime 开始时间 + * @param endTime 结束时间 + * @param outTime 超时时间 + * @param clazz 返回数据对应的对象(对象属性必须与字段名对应) + * @param + * @return + */ + @Override + public List executeRawDataQuery(List paths, long startTime, long endTime, long outTime, Class clazz) { + SessionDataSet sessionDataSet = executeRawDataQuery(paths, startTime, endTime, outTime); + List columnNames = sessionDataSet.getColumnNames(); + List resultEntities = null; + try { + resultEntities = packagingObjectData(sessionDataSet, columnNames, clazz); + } catch (Exception e) { + log.error("IotDBSession executeRawDataQuery失败: paths={}, startTime:[{}], endTime:[{}],outTime:[{}],error={}", paths, startTime, endTime, outTime, e.getMessage()); + } + return resultEntities; + } + + /** + * 最新点查询(查询最后一条时间戳大于等于某个时间点的数据) + * + * @param paths 多个字段(表名)) root.ln.wf01.wt01.temperature + * @param lastTime 结束时间 + * @return SessionDataSet + */ + @Override + public SessionDataSet executeLastDataQuery(List paths, long lastTime) { + SessionPool sessionPool = ioTDBSessionConfig.getSessionPool(); + SessionDataSetWrapper sessionDataSetWrapper = null; + try { + log.info("iotdb数据查询:paths:[{}], lastTime:[{}]", paths, lastTime); + sessionDataSetWrapper = sessionPool.executeLastDataQuery(paths, lastTime); + return sessionDataSetWrapper.getSessionDataSet(); + } catch (Exception e) { + log.error("IotDBSession executeLastDataQuery失败: paths={}, lastTime:[{}], error={}", paths, lastTime, e.getMessage()); + } finally { + sessionPool.closeResultSet(sessionDataSetWrapper); + } + return null; + } + + /** + * 最新点查询(查询最后一条时间戳大于等于某个时间点的数据) + * + * @param + * @param paths 多个字段(表名)) root.ln.wf01.wt01.temperature + * @param lastTime 结束时间 + * @param clazz 返回数据对应的对象(对象属性必须与字段名对应) + * @return + */ + @Override + public List executeLastDataQuery(List paths, long lastTime, Class clazz) { + SessionDataSet sessionDataSet = executeLastDataQuery(paths, lastTime); + List columnNames = sessionDataSet.getColumnNames(); + List resultEntities = null; + try { + resultEntities = packagingObjectData(sessionDataSet, columnNames, clazz); + } catch (Exception e) { + log.error("IotDBSession executeLastDataQuery失败: paths={}, lastTime:[{}], error={}", paths, lastTime, e.getMessage()); + } + return resultEntities; + } + + /** + * 最新点查询(快速查询单设备下指定序列最新点) + * + * @param db root.ln.wf01 + * @param device root.ln.wf01.wt01 + * @param sensors temperature,status(字段名) + * @param isLegalPathNodes true(避免路径校验) + * @return SessionDataSet + */ + @Override + public SessionDataSet executeLastDataQueryForOneDevice(String db, String device, List sensors, boolean isLegalPathNodes) { + SessionPool sessionPool = ioTDBSessionConfig.getSessionPool(); + SessionDataSetWrapper sessionDataSetWrapper = null; + try { + log.info("iotdb数据查询:db:[{}], device:[{}],sensors:[{}], isLegalPathNodes:[{}]", db, device, sensors, isLegalPathNodes); + sessionDataSetWrapper = sessionPool.executeLastDataQueryForOneDevice(db, device, sensors, isLegalPathNodes); + return sessionDataSetWrapper.getSessionDataSet(); + } catch (Exception e) { + log.error("IotDBSession executeLastDataQueryForOneDevice失败: db:[{}], device:[{}],sensors:[{}], isLegalPathNodes:[{}], error={}", db, device, sensors, isLegalPathNodes, e.getMessage()); + } finally { + sessionPool.closeResultSet(sessionDataSetWrapper); + } + return null; + } + + /** + * @param db root.ln.wf01 + * @param device root.ln.wf01.wt01 + * @param sensors temperature,status(字段名) + * @param isLegalPathNodes true(避免路径校验) + * @param clazz 返回数据对应的对象(对象属性必须与字段名对应) + * @param + * @return + */ + @Override + public List executeLastDataQueryForOneDevice(String db, String device, List sensors, boolean isLegalPathNodes, Class clazz) { + SessionDataSet sessionDataSet = executeLastDataQueryForOneDevice(db, device, sensors, isLegalPathNodes); + List columnNames = sessionDataSet.getColumnNames(); + List resultEntities = null; + try { + resultEntities = packagingObjectData(sessionDataSet, columnNames, clazz); + } catch (Exception e) { + log.error("IotDBSession executeLastDataQueryForOneDevice失败: db:[{}], device:[{}],sensors:[{}], isLegalPathNodes:[{}], error={}", db, device, sensors, isLegalPathNodes, e.getMessage()); + } + return resultEntities; + } + + /** + * 聚合查询 + * + * @param paths 多个字段(表名)) root.ln.wf01.wt01.temperature + * @param aggregations 聚合操作 TAggregationType.SUM,TAggregationType.COUNT + * @return SessionDataSet + */ + @Override + public SessionDataSet executeAggregationQuery(List paths, List aggregations) { + SessionPool sessionPool = ioTDBSessionConfig.getSessionPool(); + SessionDataSetWrapper sessionDataSetWrapper = null; + try { + log.info("iotdb聚合查询:paths:[{}], aggregations:[{}]", paths, aggregations); + sessionDataSetWrapper = sessionPool.executeAggregationQuery(paths, aggregations); + return sessionDataSetWrapper.getSessionDataSet(); + } catch (Exception e) { + log.error("IotDBSession executeAggregationQuery失败: paths:[{}], aggregations:[{}] ,error={}", paths, aggregations, e.getMessage()); + } finally { + sessionPool.closeResultSet(sessionDataSetWrapper); + } + return null; + } + + /** + * 聚合查询(时间序列原始数据范围查询,时间范围为左闭右开区间,包含开始时间但不包含结束时间) + * + * @param paths 多个字段(表名)) root.ln.wf01.wt01.temperature + * @param aggregations 聚合操作 TAggregationType.SUM,TAggregationType.COUNT + * @param startTime 开始时间(包含) + * @param endTime 结束时间 + * @return SessionDataSet + */ + @Override + public SessionDataSet executeAggregationQuery(List paths, List aggregations, long startTime, long endTime) { + SessionPool sessionPool = ioTDBSessionConfig.getSessionPool(); + SessionDataSetWrapper sessionDataSetWrapper = null; + try { + log.info("iotdb聚合查询:paths:[{}], aggregations:[{}],startTime:[{}], endTime:[{}]", paths, aggregations, startTime, endTime); + sessionDataSetWrapper = sessionPool.executeAggregationQuery(paths, aggregations, startTime, endTime); + return sessionDataSetWrapper.getSessionDataSet(); + } catch (Exception e) { + log.error("IotDBSession executeAggregationQuery失败: paths:[{}], aggregations:[{}] ,startTime:[{}], endTime:[{}],error={}", paths, aggregations, startTime, endTime, e.getMessage()); + } finally { + sessionPool.closeResultSet(sessionDataSetWrapper); + } + return null; + } + + /** + * 聚合查询(支持按照时间区间分段查询) + * + * @param paths 多个字段(表名)) root.ln.wf01.wt01.temperature + * @param aggregations 聚合操作 TAggregationType.SUM,TAggregationType.COUNT + * @param startTime 开始时间(包含) + * @param endTime 结束时间 + * @param interval + * @return SessionDataSet + */ + @Override + public SessionDataSet executeAggregationQuery(List paths, List aggregations, long startTime, long endTime, long interval) { + SessionPool sessionPool = ioTDBSessionConfig.getSessionPool(); + SessionDataSetWrapper sessionDataSetWrapper = null; + try { + log.info("iotdb聚合查询:paths:[{}], aggregations:[{}],startTime:[{}], endTime:[{}] ,interval:[{}]", paths, aggregations, startTime, endTime, interval); + sessionDataSetWrapper = sessionPool.executeAggregationQuery(paths, aggregations, startTime, endTime, interval); + return sessionDataSetWrapper.getSessionDataSet(); + } catch (Exception e) { + log.error("IotDBSession executeAggregationQuery失败: paths:[{}], aggregations:[{}] ,startTime:[{}], endTime:[{}], interval:[{}], error={}", paths, aggregations, startTime, endTime, interval, e.getMessage()); + } finally { + sessionPool.closeResultSet(sessionDataSetWrapper); + } + return null; + } + + /** + * 聚合查询(支持按照时间区间分段查询) + * + * @param paths 多个字段(表名)) root.ln.wf01.wt01.temperature + * @param aggregations 聚合操作 TAggregationType.SUM,TAggregationType.COUNT + * @param startTime 开始时间(包含) + * @param endTime 结束时间 + * @param interval + * @param slidingStep + * @return SessionDataSet + */ + @Override + public SessionDataSet executeAggregationQuery(List paths, List aggregations, long startTime, long endTime, long interval, long slidingStep) { + SessionPool sessionPool = ioTDBSessionConfig.getSessionPool(); + SessionDataSetWrapper sessionDataSetWrapper = null; + + try { + log.info("iotdb聚合查询:paths:[{}], aggregations:[{}],startTime:[{}], endTime:[{}] ,interval:[{}], slidingStep:[{}]", paths, aggregations, startTime, endTime, interval, slidingStep); + sessionDataSetWrapper = sessionPool.executeAggregationQuery(paths, aggregations, startTime, endTime, interval, slidingStep); + return sessionDataSetWrapper.getSessionDataSet(); + } catch (Exception e) { + log.error("IotDBSession executeAggregationQuery失败: paths:[{}], aggregations:[{}] ,startTime:[{}], endTime:[{}], interval:[{}], slidingStep:[{}] ,error={}", paths, aggregations, startTime, endTime, interval, slidingStep, e.getMessage()); + } finally { + sessionPool.closeResultSet(sessionDataSetWrapper); + } + return null; + } + + /** + * SQL查询 + * + * @param sql + * @return + */ + @Override + public SessionDataSet executeQueryStatement(String sql) { + SessionPool sessionPool = ioTDBSessionConfig.getSessionPool(); + SessionDataSetWrapper sessionDataSetWrapper = null; + + try { + log.info("iotdb SQL查询:sql:[{}]", sql); + sessionDataSetWrapper = sessionPool.executeQueryStatement(sql); + return sessionDataSetWrapper.getSessionDataSet(); + } catch (Exception e) { + log.error("IotDBSession executeQueryStatement失败:sql:[{}],error={}", sql, e.getMessage()); + } finally { + sessionPool.closeResultSet(sessionDataSetWrapper); + } + return null; + } + + /** + * SQL非查询 + * + * @param sql + */ + @Override + public void executeNonQueryStatement(String sql) { + SessionPool sessionPool = ioTDBSessionConfig.getSessionPool(); + try { + log.info("iotdb SQL无查询:sql:[{}]", sql); + sessionPool.executeNonQueryStatement(sql); + } catch (Exception e) { + log.error("IotDBSession executeNonQueryStatement失败:sql:[{}],error={}", sql, e.getMessage()); + } + } + + /** + * 封装处理数据 + * + * @param sessionDataSet + * @param titleList + */ + @SneakyThrows + @Override + public List> packagingMapData(SessionDataSet sessionDataSet, List titleList) { + int fetchSize = sessionDataSet.getFetchSize(); + List> resultList = new ArrayList<>(); + titleList.remove("Time"); + if (fetchSize > 0) { + while (sessionDataSet.hasNext()) { + Map resultMap = new HashMap<>(); + RowRecord next = sessionDataSet.next(); + List fields = next.getFields(); + String timeString = new java.text.SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(next.getTimestamp()); + resultMap.put("time", timeString); + for (int i = 0; i < fields.size(); i++) { + Field field = fields.get(i); + if (field.getDataType() == null || field.getObjectValue(field.getDataType()) == null) { + resultMap.put(splitString(titleList.get(i)), null); + } else { + resultMap.put(splitString(titleList.get(i)), field.getObjectValue(field.getDataType()).toString()); + } + } + resultList.add(resultMap); + } + } + return resultList; + } + + /** + * 封装处理数据(不支持聚合查询) + * + * @param sessionDataSet 查询返回的结果集 + * @param titleList 查询返回的结果集内的字段名 + * @param clazz 返回数据对应的对象(对象属性必须与字段名对应) + * @param + * @return + */ + @SneakyThrows + @Override + public List packagingObjectData(SessionDataSet sessionDataSet, List titleList, Class clazz) { + int fetchSize = sessionDataSet.getFetchSize(); + List resultList = new ArrayList<>(); + titleList.remove("Time"); + if (fetchSize > 0) { + while (sessionDataSet.hasNext()) { + Map resultMap = new HashMap<>(); + RowRecord next = sessionDataSet.next(); + List fields = next.getFields(); + String timeString = new java.text.SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(next.getTimestamp()); + resultMap.put("time", timeString); + if (titleList.stream().anyMatch(str -> str.contains("."))) { + for (int i = 0; i < fields.size(); i++) { + Field field = fields.get(i); + String title = titleList.get(i); + if (field.getDataType() == null || field.getObjectValue(field.getDataType()) == null) { + resultMap.put(splitString(title), null); + } else { + resultMap.put(splitString(title), field.getObjectValue(field.getDataType()).toString()); + } + } + } else { + Field fieldName = fields.get(0); + Field fieldValue = fields.get(1); + Field fieldDataType = fields.get(2); + if (fieldName.getDataType() != null && fieldName.getObjectValue(fieldName.getDataType()) != null) { + String mapKey = fieldName.getObjectValue(fieldName.getDataType()).toString(); + Object mapValue = convertStringToType(fieldValue.getObjectValue(fieldValue.getDataType()).toString(), fieldDataType.getObjectValue(fieldDataType.getDataType()).toString()); + resultMap.put(splitString(mapKey), mapValue); + } + } + + String jsonString = JSON.toJSONString(resultMap); + resultList.add(JSON.parseObject(jsonString, (Type) clazz)); + } + } + return resultList; + } + + /** + * 分割获取字段名 + * + * @param str + * @return 字段名 + */ + public static String splitString(String str) { + String[] parts = str.split("\\."); + if (parts.length <= 0) { + return str; + } else { + return parts[parts.length - 1]; + } + } + + /** + * 根据数据值和数据类型返回对应数据类型数据 + * + * @param value 数据值 + * @param typeName 数据类型 + * @return 转换后的数据值 + */ + public static Object convertStringToType(String value, String typeName) { + String type = typeName.toLowerCase(); + if (type.isEmpty()) { + return value; + } + if ("boolean".equals(type)) { + return Boolean.parseBoolean(value); + } else if ("double".equals(type)) { + return Double.parseDouble(value); + } else if ("int32".equals(type)) { + return Integer.parseInt(value); + } else if ("int64".equals(type)) { + return Long.parseLong(value); + } else if ("float".equals(type)) { + return Float.parseFloat(value); + } else if ("text".equals(type)) { + return value; + } else { + return value; + } + } + + /** + * 根据对象属性的数据类型返回对应的TSDataType + * + * @param type 属性的数据类型 + * @return TSDataType + */ + public static TSDataType getTsDataTypeByString(String type) { + String typeName = splitString(type).toLowerCase(); + if ("boolean".equals(typeName)) { + return TSDataType.BOOLEAN; + } else if ("double".equals(typeName)) { + return TSDataType.DOUBLE; + } else if ("int".equals(typeName) || "integer".equals(typeName)) { + return TSDataType.INT32; + } else if ("long".equals(typeName)) { + return TSDataType.INT64; + } else if ("float".equals(typeName)) { + return TSDataType.FLOAT; + } else if ("text".equals(typeName)) { + return TSDataType.TEXT; + } else if ("string".equals(typeName)) { + return TSDataType.TEXT; + } else { + return TSDataType.UNKNOWN; + } + } + + /** + * 根据对象构建MeasurementSchemas + * + * @param obj 对象 + * @return + */ + @Override + public List buildMeasurementSchemas(Object obj) { + java.lang.reflect.Field[] fields = obj.getClass().getDeclaredFields(); + List schemaList = Arrays.stream(fields).map(field -> + new MeasurementSchema(field.getName(), + getTsDataTypeByString( + field.getType().getName() + ))). + collect(Collectors.toList()); + return schemaList; + } + + /** + * 根据对象构建MeasurementSchemaValuesDTO + * + * @param obj 对象 + * @return + */ + @SneakyThrows + @Override + public MeasurementSchemaValuesDTO buildMeasurementSchemasAndValues(Object obj) { + MeasurementSchemaValuesDTO measurementSchemaValuesDTO = new MeasurementSchemaValuesDTO(); + java.lang.reflect.Field[] fields = obj.getClass().getDeclaredFields(); + List schemaList = new ArrayList<>(); + List values = new ArrayList<>(); + List valuesIsNullIndex = new ArrayList<>(); + int valueIndex = 0; + for (java.lang.reflect.Field field : fields) { + MeasurementSchema measurementSchema = new MeasurementSchema(field.getName(), getTsDataTypeByString(field.getType().getName())); + schemaList.add(measurementSchema); + Object value = field.get(obj); + if (value == null) { + valuesIsNullIndex.add(valueIndex); + } + values.add(value); + valueIndex++; + } + measurementSchemaValuesDTO.setSchemaList(schemaList); + measurementSchemaValuesDTO.setValues(values); + return measurementSchemaValuesDTO; + } +} diff --git a/cloud-modules/cloud-modules-data-process/src/main/java/com/muyu/data/process/domain/DataJSON.java b/cloud-modules/cloud-modules-data-process/src/main/java/com/muyu/data/process/domain/DataJSON.java new file mode 100644 index 0000000..86c075e --- /dev/null +++ b/cloud-modules/cloud-modules-data-process/src/main/java/com/muyu/data/process/domain/DataJSON.java @@ -0,0 +1,33 @@ +package com.muyu.data.process.domain; + +import io.swagger.v3.oas.annotations.media.Schema; +import io.swagger.v3.oas.annotations.tags.Tag; +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; +import lombok.experimental.SuperBuilder; + +/** + * @Author: zi run + * @Date 2024/9/29 0:11 + * @Description JSON数据对象 + */ +@Data +@SuperBuilder +@AllArgsConstructor +@NoArgsConstructor +@Tag(name = "ionDB数据源对象") +public class DataJSON { + + /** + * 时间戳 + */ + @Schema(name = "时间戳") + private Long timestamp; + + /** + * 车辆JSON数据 + */ + @Schema(name = "车辆JSON数据") + private String datasource; +} diff --git a/cloud-modules/cloud-modules-data-process/src/main/java/com/muyu/data/process/domain/InsertDataDTO.java b/cloud-modules/cloud-modules-data-process/src/main/java/com/muyu/data/process/domain/InsertDataDTO.java new file mode 100644 index 0000000..91d5808 --- /dev/null +++ b/cloud-modules/cloud-modules-data-process/src/main/java/com/muyu/data/process/domain/InsertDataDTO.java @@ -0,0 +1,67 @@ +package com.muyu.data.process.domain; + +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +import java.util.ArrayList; +import java.util.List; + +/** + * @Author: zi run + * @Date 2024/9/29 0:12 + * @Description 插入数据 数据转换对象 + */ +@Data +@Builder +@AllArgsConstructor +@NoArgsConstructor +public class InsertDataDTO { + + /** + * 温度值 + */ + private Float temperature; + + /** + * 硬件标识 + */ + private String hardware; + + /** + * 状态标识 + */ + private Boolean status; + + /** + * 创建一个单一的InsertDataDTO实例,并设置默认值。 + * + * @return 一个配置好的InsertDataDTO对象 + */ + public InsertDataDTO buildOne() { + InsertDataDTO insertDataDTO = new InsertDataDTO(); + insertDataDTO.setHardware("ss"); + insertDataDTO.setStatus(true); + insertDataDTO.setTemperature(12.0F); + return insertDataDTO; + } + + /** + * 创建一个单一的InsertDataDTO实例,并设置默认值。 + * + * @return 一个配置好的InsertDataDTO对象 + */ + public List buildList() { + List insertDataDTOS = new ArrayList<>(); + int buildNum = 10; + for (int i = 0; i < buildNum; i++) { + InsertDataDTO insertDataDTO = new InsertDataDTO(); + insertDataDTO.setHardware(i % 2 == 0 ? "pp" + i : null); + insertDataDTO.setStatus(i % 2 == 0); + insertDataDTO.setTemperature(12.0F + i); + insertDataDTOS.add(insertDataDTO); + } + return insertDataDTOS; + } +} diff --git a/cloud-modules/cloud-modules-data-process/src/main/java/com/muyu/data/process/domain/ResultEntity.java b/cloud-modules/cloud-modules-data-process/src/main/java/com/muyu/data/process/domain/ResultEntity.java new file mode 100644 index 0000000..c8326d2 --- /dev/null +++ b/cloud-modules/cloud-modules-data-process/src/main/java/com/muyu/data/process/domain/ResultEntity.java @@ -0,0 +1,36 @@ +package com.muyu.data.process.domain; + +import com.muyu.data.process.domain.dto.IoTDbRecordAble; +import lombok.Data; +import lombok.EqualsAndHashCode; + +/** + * @Author: zi run + * @Date 2024/9/29 0:18 + * @Description 结果实体 + */ +@Data +@EqualsAndHashCode(callSuper = true) +public class ResultEntity extends IoTDbRecordAble { + + /** + * 温度值 + */ + private Float temperature; + + /** + * 硬件标识 + */ + private String hardware; + + /** + * 状态标识 + */ + private Boolean status; + + /** + * 时间 + */ + private String time; + +} diff --git a/cloud-modules/cloud-modules-data-process/src/main/java/com/muyu/data/process/domain/TestDataType.java b/cloud-modules/cloud-modules-data-process/src/main/java/com/muyu/data/process/domain/TestDataType.java new file mode 100644 index 0000000..52d196c --- /dev/null +++ b/cloud-modules/cloud-modules-data-process/src/main/java/com/muyu/data/process/domain/TestDataType.java @@ -0,0 +1,43 @@ +package com.muyu.data.process.domain; + +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +/** + * @Author: zi run + * @Date 2024/9/29 0:22 + * @Description 测试数据类型 + */ +@Data +@Builder +@AllArgsConstructor +@NoArgsConstructor +public class TestDataType { + + /** + * 温度值 + */ + private Float temperature; + + /** + * 硬件标识 + */ + private String hardware; + + /** + * 状态标识 + */ + private Boolean status; + + /** + * 测试Double类型 + */ + private Double testDouble; + + /** + * 测试Long类型 + */ + private Long testLong; +} diff --git a/cloud-modules/cloud-modules-data-process/src/main/java/com/muyu/data/process/domain/dto/IoTDbRecordAble.java b/cloud-modules/cloud-modules-data-process/src/main/java/com/muyu/data/process/domain/dto/IoTDbRecordAble.java new file mode 100644 index 0000000..6525e38 --- /dev/null +++ b/cloud-modules/cloud-modules-data-process/src/main/java/com/muyu/data/process/domain/dto/IoTDbRecordAble.java @@ -0,0 +1,12 @@ +package com.muyu.data.process.domain.dto; + +import lombok.Data; + +/** + * @Author: zi run + * @Date 2024/9/29 0:23 + * @Description IoTDB数据库记录对象 + */ +@Data +public class IoTDbRecordAble { +} diff --git a/cloud-modules/cloud-modules-data-process/src/main/java/com/muyu/data/process/domain/dto/MeasurementSchemaValuesDTO.java b/cloud-modules/cloud-modules-data-process/src/main/java/com/muyu/data/process/domain/dto/MeasurementSchemaValuesDTO.java new file mode 100644 index 0000000..3e025dc --- /dev/null +++ b/cloud-modules/cloud-modules-data-process/src/main/java/com/muyu/data/process/domain/dto/MeasurementSchemaValuesDTO.java @@ -0,0 +1,36 @@ +package com.muyu.data.process.domain.dto; + +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; +import lombok.experimental.SuperBuilder; +import org.apache.iotdb.tsfile.write.schema.MeasurementSchema; + +import java.util.List; + +/** + * @Author: zi run + * @Date 2024/9/29 0:26 + * @Description 测量模式及其对应的值 数据传输对象 + */ +@Data +@SuperBuilder +@AllArgsConstructor +@NoArgsConstructor +public class MeasurementSchemaValuesDTO { + + /** + * 测量模式列表,每个元素表示一个测量的定义,包括名称、数据类型等信息。 + */ + private List schemaList; + + /** + * 对应于测量模式的实际值列表,存储与 schemaList 中每个测量相对应的值。 + */ + private List values; + + /** + * 存储值为空的索引列表 + */ + private List valueIsNullIndex; +} diff --git a/cloud-modules/cloud-modules-data-process/src/main/resources/banner.txt b/cloud-modules/cloud-modules-data-process/src/main/resources/banner.txt new file mode 100644 index 0000000..0dd5eee --- /dev/null +++ b/cloud-modules/cloud-modules-data-process/src/main/resources/banner.txt @@ -0,0 +1,2 @@ +Spring Boot Version: ${spring-boot.version} +Spring Application Name: ${spring.application.name} diff --git a/cloud-modules/cloud-modules-data-process/src/main/resources/bootstrap.yml b/cloud-modules/cloud-modules-data-process/src/main/resources/bootstrap.yml new file mode 100644 index 0000000..39a6819 --- /dev/null +++ b/cloud-modules/cloud-modules-data-process/src/main/resources/bootstrap.yml @@ -0,0 +1,45 @@ +# Tomcat +server: + port: 11000 + +nacos: + addr: 106.15.136.7:8848 + user-name: nacos + password: nacos + namespace: xzr + +spring: + application: + # 应用名称 + name: cloud-data-process + profiles: + # 环境配置 + active: dev + cloud: + nacos: + discovery: + # 服务注册地址 + server-addr: ${nacos.addr} + # nacos用户名 + username: ${nacos.user-name} + # nacos密码 + password: ${nacos.password} + # 命名空间 + namespace: ${nacos.namespace} + config: + # 服务注册地址 + server-addr: ${nacos.addr} + # nacos用户名 + username: ${nacos.user-name} + # nacos密码 + password: ${nacos.password} + # 命名空间 + namespace: ${nacos.namespace} + # 配置文件格式 + file-extension: yml + # 共享配置 + shared-configs: + # 系统共享配置 + - application-${spring.profiles.active}.${spring.cloud.nacos.config.file-extension} + # 系统环境Config共享配置 + - application-config-${spring.profiles.active}.${spring.cloud.nacos.config.file-extension} \ No newline at end of file diff --git a/cloud-modules/cloud-modules-data-process/src/main/resources/logback/dev.xml b/cloud-modules/cloud-modules-data-process/src/main/resources/logback/dev.xml new file mode 100644 index 0000000..6a776b3 --- /dev/null +++ b/cloud-modules/cloud-modules-data-process/src/main/resources/logback/dev.xml @@ -0,0 +1,74 @@ + + + + + + + + + + + ${log.pattern} + + + + + + ${log.path}/info.log + + + + ${log.path}/info.%d{yyyy-MM-dd}.log + + 60 + + + ${log.pattern} + + + + INFO + + ACCEPT + + DENY + + + + + ${log.path}/error.log + + + + ${log.path}/error.%d{yyyy-MM-dd}.log + + 60 + + + ${log.pattern} + + + + ERROR + + ACCEPT + + DENY + + + + + + + + + + + + + + + + + + diff --git a/cloud-modules/cloud-modules-data-process/src/main/resources/logback/prod.xml b/cloud-modules/cloud-modules-data-process/src/main/resources/logback/prod.xml new file mode 100644 index 0000000..839788c --- /dev/null +++ b/cloud-modules/cloud-modules-data-process/src/main/resources/logback/prod.xml @@ -0,0 +1,81 @@ + + + + + + + + + + + + ${log.sky.pattern} + + + + + + ${log.path}/info.log + + + + ${log.path}/info.%d{yyyy-MM-dd}.log + + 60 + + + + + INFO + + ACCEPT + + DENY + + + + + ${log.path}/error.log + + + + ${log.path}/error.%d{yyyy-MM-dd}.log + + 60 + + + + + ERROR + + ACCEPT + + DENY + + + + + + + + ${log.sky.pattern} + + + + + + + + + + + + + + + + + + + + diff --git a/cloud-modules/cloud-modules-data-process/src/main/resources/logback/test.xml b/cloud-modules/cloud-modules-data-process/src/main/resources/logback/test.xml new file mode 100644 index 0000000..1ad1029 --- /dev/null +++ b/cloud-modules/cloud-modules-data-process/src/main/resources/logback/test.xml @@ -0,0 +1,81 @@ + + + + + + + + + + + + ${log.sky.pattern} + + + + + + ${log.path}/info.log + + + + ${log.path}/info.%d{yyyy-MM-dd}.log + + 60 + + + + + INFO + + ACCEPT + + DENY + + + + + ${log.path}/error.log + + + + ${log.path}/error.%d{yyyy-MM-dd}.log + + 60 + + + + + ERROR + + ACCEPT + + DENY + + + + + + + + ${log.sky.pattern} + + + + + + + + + + + + + + + + + + + + diff --git a/cloud-modules/cloud-modules-enterprise/cloud-modules-enterprise-server/pom.xml b/cloud-modules/cloud-modules-enterprise/cloud-modules-enterprise-server/pom.xml index 911a9a6..fed66b6 100644 --- a/cloud-modules/cloud-modules-enterprise/cloud-modules-enterprise-server/pom.xml +++ b/cloud-modules/cloud-modules-enterprise/cloud-modules-enterprise-server/pom.xml @@ -22,7 +22,6 @@ - com.alibaba.cloud diff --git a/cloud-modules/cloud-modules-enterprise/cloud-modules-enterprise-server/src/main/java/com/muyu/enterprise/CloudEnterpriseApplication.java b/cloud-modules/cloud-modules-enterprise/cloud-modules-enterprise-server/src/main/java/com/muyu/enterprise/CloudEnterpriseApplication.java index ddd295f..6429815 100644 --- a/cloud-modules/cloud-modules-enterprise/cloud-modules-enterprise-server/src/main/java/com/muyu/enterprise/CloudEnterpriseApplication.java +++ b/cloud-modules/cloud-modules-enterprise/cloud-modules-enterprise-server/src/main/java/com/muyu/enterprise/CloudEnterpriseApplication.java @@ -6,7 +6,7 @@ import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; /** - * 系统模块 + * 企业平台微服务启动类 * * @author muyu */ diff --git a/cloud-modules/cloud-modules-enterprise/cloud-modules-enterprise-server/src/main/resources/bootstrap.yml b/cloud-modules/cloud-modules-enterprise/cloud-modules-enterprise-server/src/main/resources/bootstrap.yml index aed0523..1f2e969 100644 --- a/cloud-modules/cloud-modules-enterprise/cloud-modules-enterprise-server/src/main/resources/bootstrap.yml +++ b/cloud-modules/cloud-modules-enterprise/cloud-modules-enterprise-server/src/main/resources/bootstrap.yml @@ -6,7 +6,7 @@ nacos: addr: 106.15.136.7:8848 user-name: nacos password: nacos - namespace: dev + namespace: xzr spring: application: diff --git a/cloud-modules/cloud-modules-file/src/main/resources/bootstrap.yml b/cloud-modules/cloud-modules-file/src/main/resources/bootstrap.yml index eea6728..dd31e1b 100644 --- a/cloud-modules/cloud-modules-file/src/main/resources/bootstrap.yml +++ b/cloud-modules/cloud-modules-file/src/main/resources/bootstrap.yml @@ -6,7 +6,7 @@ nacos: addr: 106.15.136.7:8848 user-name: nacos password: nacos - namespace: dev + namespace: xzr # Spring spring: diff --git a/cloud-modules/cloud-modules-gen/src/main/resources/bootstrap.yml b/cloud-modules/cloud-modules-gen/src/main/resources/bootstrap.yml index 178e17b..7619514 100644 --- a/cloud-modules/cloud-modules-gen/src/main/resources/bootstrap.yml +++ b/cloud-modules/cloud-modules-gen/src/main/resources/bootstrap.yml @@ -6,7 +6,7 @@ nacos: addr: 106.15.136.7:8848 user-name: nacos password: nacos - namespace: dev + namespace: xzr # Spring spring: diff --git a/cloud-modules/cloud-modules-system/src/main/resources/bootstrap.yml b/cloud-modules/cloud-modules-system/src/main/resources/bootstrap.yml index 9390135..296688a 100644 --- a/cloud-modules/cloud-modules-system/src/main/resources/bootstrap.yml +++ b/cloud-modules/cloud-modules-system/src/main/resources/bootstrap.yml @@ -6,7 +6,7 @@ nacos: addr: 106.15.136.7:8848 user-name: nacos password: nacos - namespace: dev + namespace: xzr spring: application: diff --git a/cloud-modules/pom.xml b/cloud-modules/pom.xml index cd65b6b..c104eeb 100644 --- a/cloud-modules/pom.xml +++ b/cloud-modules/pom.xml @@ -14,6 +14,7 @@ cloud-modules-file cloud-modules-enterprise cloud-weixin-mp + cloud-modules-data-process cloud-modules diff --git a/cloud-visual/cloud-visual-monitor/src/main/resources/bootstrap.yml b/cloud-visual/cloud-visual-monitor/src/main/resources/bootstrap.yml index ddca326..f90fa0e 100644 --- a/cloud-visual/cloud-visual-monitor/src/main/resources/bootstrap.yml +++ b/cloud-visual/cloud-visual-monitor/src/main/resources/bootstrap.yml @@ -6,7 +6,7 @@ nacos: addr: 106.15.136.7:8848 user-name: nacos password: nacos - namespace: dev + namespace: xzr # Spring spring: diff --git a/pom.xml b/pom.xml index a49e303..def2bf5 100644 --- a/pom.xml +++ b/pom.xml @@ -44,6 +44,7 @@ 2.4.1 2.2.8 3.0.0 + 1.3.1 @@ -199,6 +200,13 @@ ${kafka.clients.verison} + + + org.apache.iotdb + iotdb-session + ${iotdb-session.verison} + + com.muyu