From 423836afb705e890b0d2c50f32816d9a31e3174d Mon Sep 17 00:00:00 2001 From: xinzirun Date: Sun, 29 Sep 2024 11:58:18 +0800 Subject: [PATCH] =?UTF-8?q?fix():=20=E4=BF=AE=E5=A4=8DIoTDB=E5=9F=BA?= =?UTF-8?q?=E7=A1=80=E7=B1=BB?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../process/basic/service/IoTDBService.java | 230 ++++++++++++++++-- .../basic/service/impl/IoTDBServiceImpl.java | 199 +++++++++------ .../domain/{ => dto}/InsertDataDTO.java | 2 +- 3 files changed, 331 insertions(+), 100 deletions(-) rename cloud-modules/cloud-modules-data-process/src/main/java/com/muyu/data/process/domain/{ => dto}/InsertDataDTO.java (97%) diff --git a/cloud-modules/cloud-modules-data-process/src/main/java/com/muyu/data/process/basic/service/IoTDBService.java b/cloud-modules/cloud-modules-data-process/src/main/java/com/muyu/data/process/basic/service/IoTDBService.java index 0a99fd6..a869c69 100644 --- a/cloud-modules/cloud-modules-data-process/src/main/java/com/muyu/data/process/basic/service/IoTDBService.java +++ b/cloud-modules/cloud-modules-data-process/src/main/java/com/muyu/data/process/basic/service/IoTDBService.java @@ -25,61 +25,240 @@ public interface IoTDBService { */ void insertTablet(Tablet tablet); + /** + * 将给定的 Tablets 插入到 IoTDB 数据库中。 + * + * @param tablets 一个 Map,包含要插入的 Tablets + */ void insertTablets(Map tablets); + /** + * 单条数据插入(string类型数据项) + * + * @param deviceId 设备名(表名)root.ln.wf01.wt01 + * @param time 时间戳 + * @param measurements 数据项列表 + * @param values 数据项对应值列表 + */ void insertStringRecord(String deviceId, long time, List measurements, List values); - void insertRecord(String deviceId, long time, List measurements, List types, List values); + /** + * 单条数据插入(不同类型数据项) + * + * @param deviceId 设备名(表名)root.ln.wf01.wt01 + * @param time 时间戳 + * @param measurements 数据项列表 + * @param types 数据项对应类型列表 + * @param values 数据项对应值列表 + */ + void insertRecord(String deviceId, long time, List measurements, + List types, List values); - void insertStringRecords(List deviceIds, List times, List> measurementsList, List> valuesList); + /** + * 多个设备多条数据插入(string类型数据项) + * + * @param deviceIds 多个设备名(表名)root.ln.wf01.wt01 + * @param times 时间戳的列表 + * @param measurementsList 数据项列表的列表 + * @param valuesList 数据项对应值列表的列表 + */ + void insertStringRecords(List deviceIds, List times, + List> measurementsList, List> valuesList); - void insertRecords(List deviceIds, List times, List> measurementsList, List> typesList, List> valuesList); + /** + * 多个设备多条数据插入(不同类型数据项) + * + * @param deviceIds 多个设备名(表名))root.ln.wf01.wt01 + * @param times 时间戳的列表 + * @param measurementsList 数据项列表的列表 + * @param typesList 数据项对应类型列表的列表 + * @param valuesList 数据项对应值列表的列表 + */ + void insertRecords(List deviceIds, List times, List> measurementsList, + List> typesList, List> valuesList); - void insertStringRecordsOfOneDevice(String deviceId, List times, List> measurementsList, List> valuesList); + /** + * 单个设备多条数据插入(string类型数据项) + * + * @param deviceId 单个设备名(表名))root.ln.wf01.wt01 + * @param times 时间戳的列表 + * @param measurementsList 数据项列表的列表 + * @param valuesList 数据项对应值列表的列表 + */ + void insertStringRecordsOfOneDevice(String deviceId, List times, + List> measurementsList, List> valuesList); - void insertRecordsOfOneDevice(String deviceId, List times, List> measurementsList, List> typesList, List> valuesList); + /** + * 单个设备多条数据插入(不同类型数据项) + * + * @param deviceId 单个设备名(表名))root.ln.wf01.wt01 + * @param times 时间戳的列表 + * @param measurementsList 数据项列表的列表 + * @param typesList 数据项对应类型列表的列表 + * @param valuesList 数据项对应值列表的列表 + */ + void insertRecordsOfOneDevice(String deviceId, List times, List> measurementsList, + List> typesList, List> valuesList); + /** + * 删除数据(删除一个时间序列在某个时间点前或这个时间点的数据) + * + * @param path 单个字段 root.ln.wf01.wt01.temperature + * @param endTime 删除时间点 + */ void deleteData(String path, long endTime); + /** + * 删除数据(删除多个时间序列在某个时间点前或这个时间点的数据) + * + * @param paths 多个字段(表名)) root.ln.wf01.wt01.temperature + * @param endTime 删除时间点 + */ void deleteData(List paths, long endTime); - SessionDataSet executeRawDataQuery(List paths, long startTime, long endTime, long timeOut); + /** + * 数据查询(时间序列原始数据范围查询,时间范围为左闭右开区间,包含开始时间但不包含结束时间) + * + * @param paths 多个字段(表名)) root.ln.wf01.wt01.temperature + * @param startTime 开始时间 + * @param endTime 结束时间 + * @param outTime 超时时间 + * @return SessionDataSet (Time,paths) + */ + SessionDataSet executeRawDataQuery(List paths, long startTime, long endTime, long outTime); - List executeRawDataQuery(List paths, long startTime, long endTime, long timeOut, Class clazz); + /** + * 数据查询(时间序列原始数据范围查询,时间范围为左闭右开区间,包含开始时间但不包含结束时间) + * + * @param paths 多个字段(表名),例如:"root.ln.wf01.wt01.temperature" + * @param startTime 查询数据的起始时间(包含该时间点) + * @param endTime 查询数据的结束时间(不包含该时间点) + * @param outTime 超时时间,单位为毫秒,表示查询的最长等待时间 + * @param clazz 返回数据对应的对象类型,要求对象属性与数据库字段名一致 + * @param 返回数据的对象类型泛型 + * @return 查询结果的对象列表,如果查询失败则返回 null + */ + List executeRawDataQuery(List paths, long startTime, long endTime, long outTime, + Class clazz); + /** + * 最新点查询(查询最后一条时间戳大于等于某个时间点的数据) + * + * @param paths 多个字段(表名)) root.ln.wf01.wt01.temperature + * @param lastTime 结束时间 + * @return SessionDataSet + */ SessionDataSet executeLastDataQuery(List paths, long lastTime); + /** + * 最新点查询(查询最后一条时间戳大于等于某个时间点的数据) + * + * @param paths 多个字段(表名)) root.ln.wf01.wt01.temperature + * @param lastTime 结束时间 + * @param clazz 返回数据对应的对象(对象属性必须与字段名对应) + * @return 查询结果的对象列表,如果查询失败则返回 null + * @param 返回数据的对象类型泛型 + */ List executeLastDataQuery(List paths, long lastTime, Class clazz); - SessionDataSet executeLastDataQueryForOneDevice(String db, String device, List sensors, boolean isLegalPathNodes); + /** + * 最新点查询(快速查询单设备下指定序列最新点) + * + * @param db root.ln.wf01 + * @param device root.ln.wf01.wt01 + * @param sensors temperature,status(字段名) + * @param isLegalPathNodes true(避免路径校验) + * @return SessionDataSet + */ + SessionDataSet executeLastDataQueryForOneDevice(String db, String device, + List sensors, boolean isLegalPathNodes); - List executeLastDataQueryForOneDevice(String db, String device, List sensors, boolean isLegalPathNodes, Class clazz); + /** + * 查询单个设备的最新数据(获取指定设备的最新传感器数据) + * + * @param db root.ln.wf01 + * @param device root.ln.wf01.wt01 + * @param sensors temperature,status(字段名) + * @param isLegalPathNodes true(避免路径校验) + * @param clazz 返回数据对应的对象(对象属性必须与字段名对应) + * @return 查询结果的对象列表,如果查询失败则返回 null + * @param 返回数据的对象类型泛型 + */ + List executeLastDataQueryForOneDevice(String db, String device, List sensors, + boolean isLegalPathNodes, Class clazz); + /** + * 聚合查询 + * + * @param paths 多个字段(表名)) root.ln.wf01.wt01.temperature + * @param aggregations 聚合操作 TAggregationType.SUM,TAggregationType.COUNT + * @return SessionDataSet + */ SessionDataSet executeAggregationQuery(List paths, List aggregations); - SessionDataSet executeAggregationQuery(List paths, List aggregations, long startTime, long endTime); + /** + * 聚合查询(时间序列原始数据范围查询,时间范围为左闭右开区间,包含开始时间但不包含结束时间) + * + * @param paths 多个字段(表名)) root.ln.wf01.wt01.temperature + * @param aggregations 聚合操作 TAggregationType.SUM,TAggregationType.COUNT + * @param startTime 开始时间(包含) + * @param endTime 结束时间 + * @return SessionDataSet + */ + SessionDataSet executeAggregationQuery(List paths, List aggregations, + long startTime, long endTime); - SessionDataSet executeAggregationQuery(List paths, List aggregations, long startTime, long endTime, long interval); + /** + * 聚合查询(支持按照时间区间分段查询) + * + * @param paths 多个字段(表名)) root.ln.wf01.wt01.temperature + * @param aggregations 聚合操作 TAggregationType.SUM,TAggregationType.COUNT + * @param startTime 开始时间(包含) + * @param endTime 结束时间 + * @param interval 查询的时间间隔(单位为毫秒) + * @return SessionDataSet + */ + SessionDataSet executeAggregationQuery(List paths, List aggregations, + long startTime, long endTime, long interval); - SessionDataSet executeAggregationQuery(List paths, List aggregations, long startTime, long endTime, long interval, long slidingStep); + /** + * 聚合查询(支持按照时间区间分段查询) + * + * @param paths 多个字段(表名)) root.ln.wf01.wt01.temperature + * @param aggregations 聚合操作 TAggregationType.SUM,TAggregationType.COUNT + * @param startTime 开始时间(包含) + * @param endTime 结束时间 + * @param interval 查询的时间间隔(单位为毫秒) + * @param slidingStep 滑动步长(单位为毫秒) + * @return SessionDataSet + */ + SessionDataSet executeAggregationQuery(List paths, List aggregations, + long startTime, long endTime, long interval, long slidingStep); + /** + * SQL查询 + * + * @param sql SQL查询语句,支持IotDB的查询语法 + * @return 返回查询结果的 SessionDataSet,如果执行失败则返回 null + */ SessionDataSet executeQueryStatement(String sql); /** * SQL非查询 * - * @param sql + * @param sql SQL查询语句,支持IotDB的查询语法 */ void executeNonQueryStatement(String sql); /** * 封装处理数据 * - * @param sessionDataSet - * @param columnNames + * @param sessionDataSet 包含查询结果的SessionDataSet对象 + * @param titleList 列标题列表,用于映射字段名称 + * @return 返回封装后的数据列表,每个 Map 代表一行数据,键为列名,值为对应的字段值 */ - List> packagingMapData(SessionDataSet sessionDataSet, List columnNames); + List> packagingMapData(SessionDataSet sessionDataSet, List titleList); /** * 封装处理数据(不支持聚合查询) @@ -87,24 +266,25 @@ public interface IoTDBService { * @param sessionDataSet 查询返回的结果集 * @param titleList 查询返回的结果集内的字段名 * @param clazz 返回数据对应的对象(对象属性必须与字段名对应) - * @param - * @return + * @return 返回封装后的对象列表,每个对象对应一行结果集数据 + * @param 返回对象的类型 */ - List packagingObjectData(SessionDataSet sessionDataSet, List titleList, Class clazz); + List packagingObjectData(SessionDataSet sessionDataSet, List titleList, + Class clazz); /** * 根据对象构建MeasurementSchemas * - * @param object 对象 - * @return + * @param obj 要从中提取字段信息的对象 + * @return 返回一个包含 MeasurementSchema 的列表 */ - List buildMeasurementSchemas(Object object); + List buildMeasurementSchemas(Object obj); /** * 根据对象构建MeasurementSchemaValuesDTO * - * @param object 对象 - * @return + * @param obj 要从中提取字段信息和对应值的对象 + * @return MeasurementSchemaValuesDTO 对象 */ - MeasurementSchemaValuesDTO buildMeasurementSchemasAndValues(Object object); + MeasurementSchemaValuesDTO buildMeasurementSchemasAndValues(Object obj); } diff --git a/cloud-modules/cloud-modules-data-process/src/main/java/com/muyu/data/process/basic/service/impl/IoTDBServiceImpl.java b/cloud-modules/cloud-modules-data-process/src/main/java/com/muyu/data/process/basic/service/impl/IoTDBServiceImpl.java index d551a1a..fcbd374 100644 --- a/cloud-modules/cloud-modules-data-process/src/main/java/com/muyu/data/process/basic/service/impl/IoTDBServiceImpl.java +++ b/cloud-modules/cloud-modules-data-process/src/main/java/com/muyu/data/process/basic/service/impl/IoTDBServiceImpl.java @@ -57,8 +57,7 @@ public class IoTDBServiceImpl implements IoTDBService { /** * 将给定的 Tablets 插入到 IoTDB 数据库中。 * - * @param tablets 一个 Map,包含要插入的 Tablets,键为 String 类型,值为 Tablet 对象。 - * Tablets 应该已经准备好并符合 IoTDB 的插入要求。 + * @param tablets 一个 Map,包含要插入的 Tablets */ @Override public void insertTablets(Map tablets) { @@ -101,14 +100,16 @@ public class IoTDBServiceImpl implements IoTDBService { * @param values 数据项对应值列表 */ @Override - public void insertRecord(String deviceId, long time, List measurements, List types, List values) { + public void insertRecord(String deviceId, long time, List measurements, + List types, List values) { SessionPool sessionPool = ioTDBSessionConfig.getSessionPool(); try { - log.info("iotdb数据入库:device_id:[{}], measurements:[{}], types:[{}], values:[{}]", deviceId, measurements, types, values); + log.info("iotdb数据入库:device_id:[{}], measurements:[{}], types:[{}], values:[{}]", + deviceId, measurements, types, values); sessionPool.insertRecord(deviceId, time, measurements, types, values); } catch (Exception e) { - log.error("IotDBSession insertRecordHasTypes失败: deviceId={}, time={}, measurements={},types={}, values={}, error={}", - deviceId, time, measurements, types, values, e.getMessage()); + log.error("IotDBSession insertRecordHasTypes失败: deviceId={}, time={}, measurements={}, types={}, " + + "values={}, error={}", deviceId, time, measurements, types, values, e.getMessage()); } } @@ -122,14 +123,16 @@ public class IoTDBServiceImpl implements IoTDBService { * @param valuesList 数据项对应值列表的列表 */ @Override - public void insertStringRecords(List deviceIds, List times, List> measurementsList, List> valuesList) { + public void insertStringRecords(List deviceIds, List times, List> measurementsList, + List> valuesList) { SessionPool sessionPool = ioTDBSessionConfig.getSessionPool(); try { - log.info("iotdb数据入库:deviceIds:[{}], measurementsList:[{}], valuesList:[{}]", deviceIds, measurementsList, valuesList); + log.info("iotdb数据入库:deviceIds:[{}], measurementsList:[{}], valuesList:[{}]", + deviceIds, measurementsList, valuesList); sessionPool.insertRecords(deviceIds, times, measurementsList, valuesList); } catch (Exception e) { - log.error("IotDBSession insertRecords失败: deviceIds={}, times={}, measurementsList={}, valuesList={}, error={}", - deviceIds, times, measurementsList, valuesList, e.getMessage()); + log.error("IotDBSession insertRecords失败: deviceIds={}, times={}, measurementsList={}, " + + "valuesList={}, error={}", deviceIds, times, measurementsList, valuesList, e.getMessage()); } } @@ -143,13 +146,16 @@ public class IoTDBServiceImpl implements IoTDBService { * @param valuesList 数据项对应值列表的列表 */ @Override - public void insertRecords(List deviceIds, List times, List> measurementsList, List> typesList, List> valuesList) { + public void insertRecords(List deviceIds, List times, List> measurementsList, + List> typesList, List> valuesList) { SessionPool sessionPool = ioTDBSessionConfig.getSessionPool(); try { - log.info("iotdb数据入库:deviceIds:[{}], measurementsList:[{}], typesList:[{}], valuesList:[{}]", deviceIds, measurementsList, typesList, valuesList); + log.info("iotdb数据入库:deviceIds:[{}], measurementsList:[{}], typesList:[{}], valuesList:[{}]", + deviceIds, measurementsList, typesList, valuesList); sessionPool.insertRecords(deviceIds, times, measurementsList, typesList, valuesList); } catch (Exception e) { - log.error("IotDBSession insertRecords失败: deviceIds={}, times={}, measurementsList={}, typesList=[],valuesList={}, error={}", + log.error("IotDBSession insertRecords失败: deviceIds={}, times={}, measurementsList={}, typesList={}, " + + "valuesList={}, error={}", deviceIds, times, measurementsList, typesList, valuesList, e.getMessage()); } } @@ -163,13 +169,16 @@ public class IoTDBServiceImpl implements IoTDBService { * @param valuesList 数据项对应值列表的列表 */ @Override - public void insertStringRecordsOfOneDevice(String deviceId, List times, List> measurementsList, List> valuesList) { + public void insertStringRecordsOfOneDevice(String deviceId, List times, List> measurementsList, + List> valuesList) { SessionPool sessionPool = ioTDBSessionConfig.getSessionPool(); try { - log.info("iotdb数据入库:deviceId:[{}], measurementsList:[{}], valuesList:[{}]", deviceId, measurementsList, valuesList); + log.info("iotdb数据入库:deviceId:[{}], measurementsList:[{}], valuesList:[{}]", + deviceId, measurementsList, valuesList); sessionPool.insertStringRecordsOfOneDevice(deviceId, times, measurementsList, valuesList); } catch (Exception e) { - log.error("IotDBSession insertStringRecordsOfOneDevice失败: deviceId={}, times={}, measurementsList={}, valuesList={}, error={}", + log.error("IotDBSession insertStringRecordsOfOneDevice失败: deviceId={}, times={}, " + + "measurementsList={}, valuesList={}, error={}", deviceId, times, measurementsList, valuesList, e.getMessage()); } } @@ -184,13 +193,17 @@ public class IoTDBServiceImpl implements IoTDBService { * @param valuesList 数据项对应值列表的列表 */ @Override - public void insertRecordsOfOneDevice(String deviceId, List times, List> measurementsList, List> typesList, List> valuesList) { + public void insertRecordsOfOneDevice(String deviceId, List times, List> measurementsList, + List> typesList, List> valuesList) { SessionPool sessionPool = ioTDBSessionConfig.getSessionPool(); try { - log.info("iotdb数据入库:deviceId:[{}], measurementsList:[{}], typesList:[{}], valuesList:[{}]", deviceId, measurementsList, typesList, valuesList); + log.info("iotdb数据入库:deviceId:[{}], measurementsList:[{}], typesList:[{}], valuesList:[{}]", + deviceId, measurementsList, typesList, valuesList); sessionPool.insertRecordsOfOneDevice(deviceId, times, measurementsList, typesList, valuesList); } catch (Exception e) { - log.error("IotDBSession insertRecordsOfOneDevice失败: deviceId={}, times={}, measurementsList={}, typesList=[],valuesList={}, error={}", deviceId, times, measurementsList, typesList, valuesList, e.getMessage()); + log.error("IotDBSession insertRecordsOfOneDevice失败: deviceId={}, times={}, " + + "measurementsList={},typesList={},valuesList={}, error={}", + deviceId, times, measurementsList, typesList, valuesList, e.getMessage()); } } @@ -242,11 +255,13 @@ public class IoTDBServiceImpl implements IoTDBService { SessionPool sessionPool = ioTDBSessionConfig.getSessionPool(); SessionDataSetWrapper sessionDataSetWrapper = null; try { - log.info("iotdb数据查询:paths:[{}], startTime:[{}], endTime:[{}],outTime:[{}]", paths, startTime, endTime, outTime); + log.info("iotdb数据查询:paths:[{}], startTime:[{}], endTime:[{}],outTime:[{}]", + paths, startTime, endTime, outTime); sessionDataSetWrapper = sessionPool.executeRawDataQuery(paths, startTime, endTime, outTime); return sessionDataSetWrapper.getSessionDataSet(); } catch (Exception e) { - log.error("IotDBSession executeRawDataQuery失败: paths={}, startTime:[{}], endTime:[{}],outTime:[{}],error={}", paths, startTime, endTime, outTime, e.getMessage()); + log.error("IotDBSession executeRawDataQuery失败: paths={}, startTime:[{}], endTime:[{}], " + + "outTime:[{}], error={}", paths, startTime, endTime, outTime, e.getMessage()); } finally { sessionPool.closeResultSet(sessionDataSetWrapper); } @@ -256,23 +271,25 @@ public class IoTDBServiceImpl implements IoTDBService { /** * 数据查询(时间序列原始数据范围查询,时间范围为左闭右开区间,包含开始时间但不包含结束时间) * - * @param paths 多个字段(表名)) root.ln.wf01.wt01.temperature - * @param startTime 开始时间 - * @param endTime 结束时间 - * @param outTime 超时时间 - * @param clazz 返回数据对应的对象(对象属性必须与字段名对应) - * @param - * @return + * @param paths 多个字段(表名),例如:"root.ln.wf01.wt01.temperature" + * @param startTime 查询数据的起始时间(包含该时间点) + * @param endTime 查询数据的结束时间(不包含该时间点) + * @param outTime 超时时间,单位为毫秒,表示查询的最长等待时间 + * @param clazz 返回数据对应的对象类型,要求对象属性与数据库字段名一致 + * @param 返回数据的对象类型泛型 + * @return 查询结果的对象列表,如果查询失败则返回 null */ @Override - public List executeRawDataQuery(List paths, long startTime, long endTime, long outTime, Class clazz) { + public List executeRawDataQuery(List paths, long startTime, long endTime, long outTime, + Class clazz) { SessionDataSet sessionDataSet = executeRawDataQuery(paths, startTime, endTime, outTime); List columnNames = sessionDataSet.getColumnNames(); List resultEntities = null; try { resultEntities = packagingObjectData(sessionDataSet, columnNames, clazz); } catch (Exception e) { - log.error("IotDBSession executeRawDataQuery失败: paths={}, startTime:[{}], endTime:[{}],outTime:[{}],error={}", paths, startTime, endTime, outTime, e.getMessage()); + log.error("IotDBSession executeRawDataQuery失败: paths={}, startTime:[{}], endTime:[{}], " + + "outTime:[{}], error={}", paths, startTime, endTime, outTime, e.getMessage()); } return resultEntities; } @@ -293,7 +310,8 @@ public class IoTDBServiceImpl implements IoTDBService { sessionDataSetWrapper = sessionPool.executeLastDataQuery(paths, lastTime); return sessionDataSetWrapper.getSessionDataSet(); } catch (Exception e) { - log.error("IotDBSession executeLastDataQuery失败: paths={}, lastTime:[{}], error={}", paths, lastTime, e.getMessage()); + log.error("IotDBSession executeLastDataQuery失败: paths={}, lastTime:[{}], error={}", + paths, lastTime, e.getMessage()); } finally { sessionPool.closeResultSet(sessionDataSetWrapper); } @@ -303,11 +321,11 @@ public class IoTDBServiceImpl implements IoTDBService { /** * 最新点查询(查询最后一条时间戳大于等于某个时间点的数据) * - * @param * @param paths 多个字段(表名)) root.ln.wf01.wt01.temperature * @param lastTime 结束时间 * @param clazz 返回数据对应的对象(对象属性必须与字段名对应) - * @return + * @return 查询结果的对象列表,如果查询失败则返回 null + * @param 返回数据的对象类型泛型 */ @Override public List executeLastDataQuery(List paths, long lastTime, Class clazz) { @@ -317,7 +335,8 @@ public class IoTDBServiceImpl implements IoTDBService { try { resultEntities = packagingObjectData(sessionDataSet, columnNames, clazz); } catch (Exception e) { - log.error("IotDBSession executeLastDataQuery失败: paths={}, lastTime:[{}], error={}", paths, lastTime, e.getMessage()); + log.error("IotDBSession executeLastDataQuery失败: paths={}, lastTime:[{}], error={}", + paths, lastTime, e.getMessage()); } return resultEntities; } @@ -332,15 +351,18 @@ public class IoTDBServiceImpl implements IoTDBService { * @return SessionDataSet */ @Override - public SessionDataSet executeLastDataQueryForOneDevice(String db, String device, List sensors, boolean isLegalPathNodes) { + public SessionDataSet executeLastDataQueryForOneDevice(String db, String device, List sensors, + boolean isLegalPathNodes) { SessionPool sessionPool = ioTDBSessionConfig.getSessionPool(); SessionDataSetWrapper sessionDataSetWrapper = null; try { - log.info("iotdb数据查询:db:[{}], device:[{}],sensors:[{}], isLegalPathNodes:[{}]", db, device, sensors, isLegalPathNodes); + log.info("iotdb数据查询:db:[{}], device:[{}],sensors:[{}], isLegalPathNodes:[{}]", + db, device, sensors, isLegalPathNodes); sessionDataSetWrapper = sessionPool.executeLastDataQueryForOneDevice(db, device, sensors, isLegalPathNodes); return sessionDataSetWrapper.getSessionDataSet(); } catch (Exception e) { - log.error("IotDBSession executeLastDataQueryForOneDevice失败: db:[{}], device:[{}],sensors:[{}], isLegalPathNodes:[{}], error={}", db, device, sensors, isLegalPathNodes, e.getMessage()); + log.error("IotDBSession executeLastDataQueryForOneDevice失败: db:[{}], device:[{}], sensors:[{}], " + + "isLegalPathNodes:[{}], error={}", db, device, sensors, isLegalPathNodes, e.getMessage()); } finally { sessionPool.closeResultSet(sessionDataSetWrapper); } @@ -348,23 +370,28 @@ public class IoTDBServiceImpl implements IoTDBService { } /** + * 查询单个设备的最新数据(获取指定设备的最新传感器数据) + * * @param db root.ln.wf01 * @param device root.ln.wf01.wt01 * @param sensors temperature,status(字段名) * @param isLegalPathNodes true(避免路径校验) * @param clazz 返回数据对应的对象(对象属性必须与字段名对应) - * @param - * @return + * @return 查询结果的对象列表,如果查询失败则返回 null + * @param 返回数据的对象类型泛型 */ @Override - public List executeLastDataQueryForOneDevice(String db, String device, List sensors, boolean isLegalPathNodes, Class clazz) { + public List executeLastDataQueryForOneDevice(String db, String device, List sensors, + boolean isLegalPathNodes, + Class clazz) { SessionDataSet sessionDataSet = executeLastDataQueryForOneDevice(db, device, sensors, isLegalPathNodes); List columnNames = sessionDataSet.getColumnNames(); List resultEntities = null; try { resultEntities = packagingObjectData(sessionDataSet, columnNames, clazz); } catch (Exception e) { - log.error("IotDBSession executeLastDataQueryForOneDevice失败: db:[{}], device:[{}],sensors:[{}], isLegalPathNodes:[{}], error={}", db, device, sensors, isLegalPathNodes, e.getMessage()); + log.error("IotDBSession executeLastDataQueryForOneDevice失败: db:[{}], device:[{}],sensors:[{}], " + + "isLegalPathNodes:[{}], error={}", db, device, sensors, isLegalPathNodes, e.getMessage()); } return resultEntities; } @@ -385,7 +412,8 @@ public class IoTDBServiceImpl implements IoTDBService { sessionDataSetWrapper = sessionPool.executeAggregationQuery(paths, aggregations); return sessionDataSetWrapper.getSessionDataSet(); } catch (Exception e) { - log.error("IotDBSession executeAggregationQuery失败: paths:[{}], aggregations:[{}] ,error={}", paths, aggregations, e.getMessage()); + log.error("IotDBSession executeAggregationQuery失败: paths:[{}], aggregations:[{}] ,error={}", + paths, aggregations, e.getMessage()); } finally { sessionPool.closeResultSet(sessionDataSetWrapper); } @@ -402,15 +430,18 @@ public class IoTDBServiceImpl implements IoTDBService { * @return SessionDataSet */ @Override - public SessionDataSet executeAggregationQuery(List paths, List aggregations, long startTime, long endTime) { + public SessionDataSet executeAggregationQuery(List paths, List aggregations, + long startTime, long endTime) { SessionPool sessionPool = ioTDBSessionConfig.getSessionPool(); SessionDataSetWrapper sessionDataSetWrapper = null; try { - log.info("iotdb聚合查询:paths:[{}], aggregations:[{}],startTime:[{}], endTime:[{}]", paths, aggregations, startTime, endTime); + log.info("iotdb聚合查询:paths:[{}], aggregations:[{}],startTime:[{}], endTime:[{}]", + paths, aggregations, startTime, endTime); sessionDataSetWrapper = sessionPool.executeAggregationQuery(paths, aggregations, startTime, endTime); return sessionDataSetWrapper.getSessionDataSet(); } catch (Exception e) { - log.error("IotDBSession executeAggregationQuery失败: paths:[{}], aggregations:[{}] ,startTime:[{}], endTime:[{}],error={}", paths, aggregations, startTime, endTime, e.getMessage()); + log.error("IotDBSession executeAggregationQuery失败: paths:[{}], aggregations:[{}], " + + "startTime:[{}], endTime:[{}],error={}", paths, aggregations, startTime, endTime, e.getMessage()); } finally { sessionPool.closeResultSet(sessionDataSetWrapper); } @@ -424,19 +455,25 @@ public class IoTDBServiceImpl implements IoTDBService { * @param aggregations 聚合操作 TAggregationType.SUM,TAggregationType.COUNT * @param startTime 开始时间(包含) * @param endTime 结束时间 - * @param interval + * @param interval 查询的时间间隔(单位为毫秒) * @return SessionDataSet */ @Override - public SessionDataSet executeAggregationQuery(List paths, List aggregations, long startTime, long endTime, long interval) { + public SessionDataSet executeAggregationQuery(List paths, List aggregations, + long startTime, long endTime, long interval) { SessionPool sessionPool = ioTDBSessionConfig.getSessionPool(); SessionDataSetWrapper sessionDataSetWrapper = null; try { - log.info("iotdb聚合查询:paths:[{}], aggregations:[{}],startTime:[{}], endTime:[{}] ,interval:[{}]", paths, aggregations, startTime, endTime, interval); - sessionDataSetWrapper = sessionPool.executeAggregationQuery(paths, aggregations, startTime, endTime, interval); + log.info("iotdb聚合查询:paths:[{}], aggregations:[{}],startTime:[{}], endTime:[{}] ,interval:[{}]", + paths, aggregations, startTime, endTime, interval); + sessionDataSetWrapper = sessionPool.executeAggregationQuery( + paths, aggregations, startTime, endTime, interval + ); return sessionDataSetWrapper.getSessionDataSet(); } catch (Exception e) { - log.error("IotDBSession executeAggregationQuery失败: paths:[{}], aggregations:[{}] ,startTime:[{}], endTime:[{}], interval:[{}], error={}", paths, aggregations, startTime, endTime, interval, e.getMessage()); + log.error("IotDBSession executeAggregationQuery失败: paths:[{}], aggregations:[{}] , " + + "startTime:[{}], endTime:[{}], interval:[{}], error={}", + paths, aggregations, startTime, endTime, interval, e.getMessage()); } finally { sessionPool.closeResultSet(sessionDataSetWrapper); } @@ -450,21 +487,26 @@ public class IoTDBServiceImpl implements IoTDBService { * @param aggregations 聚合操作 TAggregationType.SUM,TAggregationType.COUNT * @param startTime 开始时间(包含) * @param endTime 结束时间 - * @param interval - * @param slidingStep + * @param interval 查询的时间间隔(单位为毫秒) + * @param slidingStep 滑动步长(单位为毫秒) * @return SessionDataSet */ @Override - public SessionDataSet executeAggregationQuery(List paths, List aggregations, long startTime, long endTime, long interval, long slidingStep) { + public SessionDataSet executeAggregationQuery(List paths, List aggregations, + long startTime, long endTime, long interval, long slidingStep) { SessionPool sessionPool = ioTDBSessionConfig.getSessionPool(); SessionDataSetWrapper sessionDataSetWrapper = null; try { - log.info("iotdb聚合查询:paths:[{}], aggregations:[{}],startTime:[{}], endTime:[{}] ,interval:[{}], slidingStep:[{}]", paths, aggregations, startTime, endTime, interval, slidingStep); - sessionDataSetWrapper = sessionPool.executeAggregationQuery(paths, aggregations, startTime, endTime, interval, slidingStep); + log.info("iotdb聚合查询:paths:[{}], aggregations:[{}],startTime:[{}], endTime:[{}] ,interval:[{}], " + + "slidingStep:[{}]", paths, aggregations, startTime, endTime, interval, slidingStep); + sessionDataSetWrapper = sessionPool.executeAggregationQuery(paths, aggregations, startTime, endTime, + interval, slidingStep); return sessionDataSetWrapper.getSessionDataSet(); } catch (Exception e) { - log.error("IotDBSession executeAggregationQuery失败: paths:[{}], aggregations:[{}] ,startTime:[{}], endTime:[{}], interval:[{}], slidingStep:[{}] ,error={}", paths, aggregations, startTime, endTime, interval, slidingStep, e.getMessage()); + log.error("IotDBSession executeAggregationQuery失败: paths:[{}], aggregations:[{}] , " + + "startTime:[{}], endTime:[{}], interval:[{}], slidingStep:[{}] ,error={}", + paths, aggregations, startTime, endTime, interval, slidingStep, e.getMessage()); } finally { sessionPool.closeResultSet(sessionDataSetWrapper); } @@ -474,8 +516,8 @@ public class IoTDBServiceImpl implements IoTDBService { /** * SQL查询 * - * @param sql - * @return + * @param sql SQL查询语句,支持IotDB的查询语法 + * @return 返回查询结果的 SessionDataSet,如果执行失败则返回 null */ @Override public SessionDataSet executeQueryStatement(String sql) { @@ -497,7 +539,7 @@ public class IoTDBServiceImpl implements IoTDBService { /** * SQL非查询 * - * @param sql + * @param sql SQL查询语句,支持IotDB的查询语法 */ @Override public void executeNonQueryStatement(String sql) { @@ -513,8 +555,9 @@ public class IoTDBServiceImpl implements IoTDBService { /** * 封装处理数据 * - * @param sessionDataSet - * @param titleList + * @param sessionDataSet 包含查询结果的SessionDataSet对象 + * @param titleList 列标题列表,用于映射字段名称 + * @return 返回封装后的数据列表,每个 Map 代表一行数据,键为列名,值为对应的字段值 */ @SneakyThrows @Override @@ -527,14 +570,16 @@ public class IoTDBServiceImpl implements IoTDBService { Map resultMap = new HashMap<>(); RowRecord next = sessionDataSet.next(); List fields = next.getFields(); - String timeString = new java.text.SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(next.getTimestamp()); + String timeString = new java.text.SimpleDateFormat("yyyy-MM-dd HH:mm:ss") + .format(next.getTimestamp()); resultMap.put("time", timeString); for (int i = 0; i < fields.size(); i++) { Field field = fields.get(i); if (field.getDataType() == null || field.getObjectValue(field.getDataType()) == null) { resultMap.put(splitString(titleList.get(i)), null); } else { - resultMap.put(splitString(titleList.get(i)), field.getObjectValue(field.getDataType()).toString()); + resultMap.put(splitString(titleList.get(i)), + field.getObjectValue(field.getDataType()).toString()); } } resultList.add(resultMap); @@ -549,12 +594,13 @@ public class IoTDBServiceImpl implements IoTDBService { * @param sessionDataSet 查询返回的结果集 * @param titleList 查询返回的结果集内的字段名 * @param clazz 返回数据对应的对象(对象属性必须与字段名对应) - * @param - * @return + * @return 返回封装后的对象列表,每个对象对应一行结果集数据 + * @param 返回对象的类型 */ @SneakyThrows @Override - public List packagingObjectData(SessionDataSet sessionDataSet, List titleList, Class clazz) { + public List packagingObjectData(SessionDataSet sessionDataSet, List titleList, + Class clazz) { int fetchSize = sessionDataSet.getFetchSize(); List resultList = new ArrayList<>(); titleList.remove("Time"); @@ -563,7 +609,8 @@ public class IoTDBServiceImpl implements IoTDBService { Map resultMap = new HashMap<>(); RowRecord next = sessionDataSet.next(); List fields = next.getFields(); - String timeString = new java.text.SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(next.getTimestamp()); + String timeString = new java.text.SimpleDateFormat("yyyy-MM-dd HH:mm:ss") + .format(next.getTimestamp()); resultMap.put("time", timeString); if (titleList.stream().anyMatch(str -> str.contains("."))) { for (int i = 0; i < fields.size(); i++) { @@ -581,7 +628,10 @@ public class IoTDBServiceImpl implements IoTDBService { Field fieldDataType = fields.get(2); if (fieldName.getDataType() != null && fieldName.getObjectValue(fieldName.getDataType()) != null) { String mapKey = fieldName.getObjectValue(fieldName.getDataType()).toString(); - Object mapValue = convertStringToType(fieldValue.getObjectValue(fieldValue.getDataType()).toString(), fieldDataType.getObjectValue(fieldDataType.getDataType()).toString()); + Object mapValue = convertStringToType( + fieldValue.getObjectValue(fieldValue.getDataType()).toString(), + fieldDataType.getObjectValue(fieldDataType.getDataType()).toString() + ); resultMap.put(splitString(mapKey), mapValue); } } @@ -596,7 +646,7 @@ public class IoTDBServiceImpl implements IoTDBService { /** * 分割获取字段名 * - * @param str + * @param str 输入的字符串 * @return 字段名 */ public static String splitString(String str) { @@ -667,8 +717,8 @@ public class IoTDBServiceImpl implements IoTDBService { /** * 根据对象构建MeasurementSchemas * - * @param obj 对象 - * @return + * @param obj 要从中提取字段信息的对象 + * @return 返回一个包含 MeasurementSchema 的列表 */ @Override public List buildMeasurementSchemas(Object obj) { @@ -685,8 +735,8 @@ public class IoTDBServiceImpl implements IoTDBService { /** * 根据对象构建MeasurementSchemaValuesDTO * - * @param obj 对象 - * @return + * @param obj 要从中提取字段信息和对应值的对象 + * @return MeasurementSchemaValuesDTO 对象 */ @SneakyThrows @Override @@ -698,7 +748,8 @@ public class IoTDBServiceImpl implements IoTDBService { List valuesIsNullIndex = new ArrayList<>(); int valueIndex = 0; for (java.lang.reflect.Field field : fields) { - MeasurementSchema measurementSchema = new MeasurementSchema(field.getName(), getTsDataTypeByString(field.getType().getName())); + MeasurementSchema measurementSchema = new MeasurementSchema(field.getName(), + getTsDataTypeByString(field.getType().getName())); schemaList.add(measurementSchema); Object value = field.get(obj); if (value == null) { diff --git a/cloud-modules/cloud-modules-data-process/src/main/java/com/muyu/data/process/domain/InsertDataDTO.java b/cloud-modules/cloud-modules-data-process/src/main/java/com/muyu/data/process/domain/dto/InsertDataDTO.java similarity index 97% rename from cloud-modules/cloud-modules-data-process/src/main/java/com/muyu/data/process/domain/InsertDataDTO.java rename to cloud-modules/cloud-modules-data-process/src/main/java/com/muyu/data/process/domain/dto/InsertDataDTO.java index 91d5808..1560ac6 100644 --- a/cloud-modules/cloud-modules-data-process/src/main/java/com/muyu/data/process/domain/InsertDataDTO.java +++ b/cloud-modules/cloud-modules-data-process/src/main/java/com/muyu/data/process/domain/dto/InsertDataDTO.java @@ -1,4 +1,4 @@ -package com.muyu.data.process.domain; +package com.muyu.data.process.domain.dto; import lombok.AllArgsConstructor; import lombok.Builder;