fix(): 修复IoTDB基础类

dev.vehicleGateway
xinzirun 2024-09-29 11:58:18 +08:00
parent 47567ff055
commit 423836afb7
3 changed files with 331 additions and 100 deletions

View File

@ -25,61 +25,240 @@ public interface IoTDBService {
*/ */
void insertTablet(Tablet tablet); void insertTablet(Tablet tablet);
/**
* Tablets IoTDB
*
* @param tablets Map Tablets
*/
void insertTablets(Map<String, Tablet> tablets); void insertTablets(Map<String, Tablet> tablets);
/**
* string
*
* @param deviceId root.ln.wf01.wt01
* @param time
* @param measurements
* @param values
*/
void insertStringRecord(String deviceId, long time, List<String> measurements, List<String> values); void insertStringRecord(String deviceId, long time, List<String> measurements, List<String> values);
void insertRecord(String deviceId, long time, List<String> measurements, List<TSDataType> types, List<Object> values); /**
*
*
* @param deviceId root.ln.wf01.wt01
* @param time
* @param measurements
* @param types
* @param values
*/
void insertRecord(String deviceId, long time, List<String> measurements,
List<TSDataType> types, List<Object> values);
void insertStringRecords(List<String> deviceIds, List<Long> times, List<List<String>> measurementsList, List<List<String>> valuesList); /**
* string
*
* @param deviceIds root.ln.wf01.wt01
* @param times
* @param measurementsList
* @param valuesList
*/
void insertStringRecords(List<String> deviceIds, List<Long> times,
List<List<String>> measurementsList, List<List<String>> valuesList);
void insertRecords(List<String> deviceIds, List<Long> times, List<List<String>> measurementsList, List<List<TSDataType>> typesList, List<List<Object>> valuesList); /**
*
*
* @param deviceIds root.ln.wf01.wt01
* @param times
* @param measurementsList
* @param typesList
* @param valuesList
*/
void insertRecords(List<String> deviceIds, List<Long> times, List<List<String>> measurementsList,
List<List<TSDataType>> typesList, List<List<Object>> valuesList);
void insertStringRecordsOfOneDevice(String deviceId, List<Long> times, List<List<String>> measurementsList, List<List<String>> valuesList); /**
* string
*
* @param deviceId root.ln.wf01.wt01
* @param times
* @param measurementsList
* @param valuesList
*/
void insertStringRecordsOfOneDevice(String deviceId, List<Long> times,
List<List<String>> measurementsList, List<List<String>> valuesList);
void insertRecordsOfOneDevice(String deviceId, List<Long> times, List<List<String>> measurementsList, List<List<TSDataType>> typesList, List<List<Object>> valuesList); /**
*
*
* @param deviceId root.ln.wf01.wt01
* @param times
* @param measurementsList
* @param typesList
* @param valuesList
*/
void insertRecordsOfOneDevice(String deviceId, List<Long> times, List<List<String>> measurementsList,
List<List<TSDataType>> typesList, List<List<Object>> valuesList);
/**
*
*
* @param path root.ln.wf01.wt01.temperature
* @param endTime
*/
void deleteData(String path, long endTime); void deleteData(String path, long endTime);
/**
*
*
* @param paths root.ln.wf01.wt01.temperature
* @param endTime
*/
void deleteData(List<String> paths, long endTime); void deleteData(List<String> paths, long endTime);
SessionDataSet executeRawDataQuery(List<String> paths, long startTime, long endTime, long timeOut); /**
*
*
* @param paths root.ln.wf01.wt01.temperature
* @param startTime
* @param endTime
* @param outTime
* @return SessionDataSet (Time,paths)
*/
SessionDataSet executeRawDataQuery(List<String> paths, long startTime, long endTime, long outTime);
<T> List<T> executeRawDataQuery(List<String> paths, long startTime, long endTime, long timeOut, Class<? extends IoTDbRecordAble> clazz); /**
*
*
* @param paths "root.ln.wf01.wt01.temperature"
* @param startTime
* @param endTime
* @param outTime
* @param clazz
* @param <T>
* @return null
*/
<T> List<T> executeRawDataQuery(List<String> paths, long startTime, long endTime, long outTime,
Class<? extends IoTDbRecordAble> clazz);
/**
* ()
*
* @param paths root.ln.wf01.wt01.temperature
* @param lastTime
* @return SessionDataSet
*/
SessionDataSet executeLastDataQuery(List<String> paths, long lastTime); SessionDataSet executeLastDataQuery(List<String> paths, long lastTime);
/**
* ()
*
* @param paths root.ln.wf01.wt01.temperature
* @param lastTime
* @param clazz
* @return null
* @param <T>
*/
<T> List<T> executeLastDataQuery(List<String> paths, long lastTime, Class<? extends IoTDbRecordAble> clazz); <T> List<T> executeLastDataQuery(List<String> paths, long lastTime, Class<? extends IoTDbRecordAble> clazz);
SessionDataSet executeLastDataQueryForOneDevice(String db, String device, List<String> sensors, boolean isLegalPathNodes); /**
*
*
* @param db root.ln.wf01
* @param device root.ln.wf01.wt01
* @param sensors temperaturestatus()
* @param isLegalPathNodes true()
* @return SessionDataSet
*/
SessionDataSet executeLastDataQueryForOneDevice(String db, String device,
List<String> sensors, boolean isLegalPathNodes);
<T> List<T> executeLastDataQueryForOneDevice(String db, String device, List<String> sensors, boolean isLegalPathNodes, Class<? extends IoTDbRecordAble> clazz); /**
*
*
* @param db root.ln.wf01
* @param device root.ln.wf01.wt01
* @param sensors temperaturestatus()
* @param isLegalPathNodes true()
* @param clazz
* @return null
* @param <T>
*/
<T> List<T> executeLastDataQueryForOneDevice(String db, String device, List<String> sensors,
boolean isLegalPathNodes, Class<? extends IoTDbRecordAble> clazz);
/**
*
*
* @param paths root.ln.wf01.wt01.temperature
* @param aggregations TAggregationType.SUM,TAggregationType.COUNT
* @return SessionDataSet
*/
SessionDataSet executeAggregationQuery(List<String> paths, List<TAggregationType> aggregations); SessionDataSet executeAggregationQuery(List<String> paths, List<TAggregationType> aggregations);
SessionDataSet executeAggregationQuery(List<String> paths, List<TAggregationType> aggregations, long startTime, long endTime); /**
* ()
*
* @param paths root.ln.wf01.wt01.temperature
* @param aggregations TAggregationType.SUM,TAggregationType.COUNT
* @param startTime ()
* @param endTime
* @return SessionDataSet
*/
SessionDataSet executeAggregationQuery(List<String> paths, List<TAggregationType> aggregations,
long startTime, long endTime);
SessionDataSet executeAggregationQuery(List<String> paths, List<TAggregationType> aggregations, long startTime, long endTime, long interval); /**
* ()
*
* @param paths root.ln.wf01.wt01.temperature
* @param aggregations TAggregationType.SUM,TAggregationType.COUNT
* @param startTime ()
* @param endTime
* @param interval
* @return SessionDataSet
*/
SessionDataSet executeAggregationQuery(List<String> paths, List<TAggregationType> aggregations,
long startTime, long endTime, long interval);
SessionDataSet executeAggregationQuery(List<String> paths, List<TAggregationType> aggregations, long startTime, long endTime, long interval, long slidingStep); /**
* ()
*
* @param paths root.ln.wf01.wt01.temperature
* @param aggregations TAggregationType.SUM,TAggregationType.COUNT
* @param startTime ()
* @param endTime
* @param interval
* @param slidingStep
* @return SessionDataSet
*/
SessionDataSet executeAggregationQuery(List<String> paths, List<TAggregationType> aggregations,
long startTime, long endTime, long interval, long slidingStep);
/**
* SQL
*
* @param sql SQLIotDB
* @return SessionDataSet null
*/
SessionDataSet executeQueryStatement(String sql); SessionDataSet executeQueryStatement(String sql);
/** /**
* SQL * SQL
* *
* @param sql * @param sql SQLIotDB
*/ */
void executeNonQueryStatement(String sql); void executeNonQueryStatement(String sql);
/** /**
* *
* *
* @param sessionDataSet * @param sessionDataSet SessionDataSet
* @param columnNames * @param titleList
* @return Map
*/ */
List<Map<String, Object>> packagingMapData(SessionDataSet sessionDataSet, List<String> columnNames); List<Map<String, Object>> packagingMapData(SessionDataSet sessionDataSet, List<String> titleList);
/** /**
* *
@ -87,24 +266,25 @@ public interface IoTDBService {
* @param sessionDataSet * @param sessionDataSet
* @param titleList * @param titleList
* @param clazz * @param clazz
* @param <T> * @return
* @return * @param <T>
*/ */
<T> List<T> packagingObjectData(SessionDataSet sessionDataSet, List<String> titleList, Class<? extends IoTDbRecordAble> clazz); <T> List<T> packagingObjectData(SessionDataSet sessionDataSet, List<String> titleList,
Class<? extends IoTDbRecordAble> clazz);
/** /**
* MeasurementSchemas * MeasurementSchemas
* *
* @param object * @param obj
* @return * @return MeasurementSchema
*/ */
List<MeasurementSchema> buildMeasurementSchemas(Object object); List<MeasurementSchema> buildMeasurementSchemas(Object obj);
/** /**
* MeasurementSchemaValuesDTO * MeasurementSchemaValuesDTO
* *
* @param object * @param obj
* @return * @return MeasurementSchemaValuesDTO
*/ */
MeasurementSchemaValuesDTO buildMeasurementSchemasAndValues(Object object); MeasurementSchemaValuesDTO buildMeasurementSchemasAndValues(Object obj);
} }

View File

@ -57,8 +57,7 @@ public class IoTDBServiceImpl implements IoTDBService {
/** /**
* Tablets IoTDB * Tablets IoTDB
* *
* @param tablets Map Tablets String Tablet * @param tablets Map Tablets
* Tablets IoTDB
*/ */
@Override @Override
public void insertTablets(Map<String, Tablet> tablets) { public void insertTablets(Map<String, Tablet> tablets) {
@ -101,14 +100,16 @@ public class IoTDBServiceImpl implements IoTDBService {
* @param values * @param values
*/ */
@Override @Override
public void insertRecord(String deviceId, long time, List<String> measurements, List<TSDataType> types, List<Object> values) { public void insertRecord(String deviceId, long time, List<String> measurements,
List<TSDataType> types, List<Object> values) {
SessionPool sessionPool = ioTDBSessionConfig.getSessionPool(); SessionPool sessionPool = ioTDBSessionConfig.getSessionPool();
try { try {
log.info("iotdb数据入库device_id:[{}], measurements:[{}], types:[{}], values:[{}]", deviceId, measurements, types, values); log.info("iotdb数据入库device_id:[{}], measurements:[{}], types:[{}], values:[{}]",
deviceId, measurements, types, values);
sessionPool.insertRecord(deviceId, time, measurements, types, values); sessionPool.insertRecord(deviceId, time, measurements, types, values);
} catch (Exception e) { } catch (Exception e) {
log.error("IotDBSession insertRecordHasTypes失败: deviceId={}, time={}, measurements={},types={}, values={}, error={}", log.error("IotDBSession insertRecordHasTypes失败: deviceId={}, time={}, measurements={}, types={}, " +
deviceId, time, measurements, types, values, e.getMessage()); "values={}, error={}", deviceId, time, measurements, types, values, e.getMessage());
} }
} }
@ -122,14 +123,16 @@ public class IoTDBServiceImpl implements IoTDBService {
* @param valuesList * @param valuesList
*/ */
@Override @Override
public void insertStringRecords(List<String> deviceIds, List<Long> times, List<List<String>> measurementsList, List<List<String>> valuesList) { public void insertStringRecords(List<String> deviceIds, List<Long> times, List<List<String>> measurementsList,
List<List<String>> valuesList) {
SessionPool sessionPool = ioTDBSessionConfig.getSessionPool(); SessionPool sessionPool = ioTDBSessionConfig.getSessionPool();
try { try {
log.info("iotdb数据入库deviceIds:[{}], measurementsList:[{}], valuesList:[{}]", deviceIds, measurementsList, valuesList); log.info("iotdb数据入库deviceIds:[{}], measurementsList:[{}], valuesList:[{}]",
deviceIds, measurementsList, valuesList);
sessionPool.insertRecords(deviceIds, times, measurementsList, valuesList); sessionPool.insertRecords(deviceIds, times, measurementsList, valuesList);
} catch (Exception e) { } catch (Exception e) {
log.error("IotDBSession insertRecords失败: deviceIds={}, times={}, measurementsList={}, valuesList={}, error={}", log.error("IotDBSession insertRecords失败: deviceIds={}, times={}, measurementsList={}, " +
deviceIds, times, measurementsList, valuesList, e.getMessage()); "valuesList={}, error={}", deviceIds, times, measurementsList, valuesList, e.getMessage());
} }
} }
@ -143,13 +146,16 @@ public class IoTDBServiceImpl implements IoTDBService {
* @param valuesList * @param valuesList
*/ */
@Override @Override
public void insertRecords(List<String> deviceIds, List<Long> times, List<List<String>> measurementsList, List<List<TSDataType>> typesList, List<List<Object>> valuesList) { public void insertRecords(List<String> deviceIds, List<Long> times, List<List<String>> measurementsList,
List<List<TSDataType>> typesList, List<List<Object>> valuesList) {
SessionPool sessionPool = ioTDBSessionConfig.getSessionPool(); SessionPool sessionPool = ioTDBSessionConfig.getSessionPool();
try { try {
log.info("iotdb数据入库deviceIds:[{}], measurementsList:[{}], typesList[{}], valuesList:[{}]", deviceIds, measurementsList, typesList, valuesList); log.info("iotdb数据入库deviceIds:[{}], measurementsList:[{}], typesList[{}], valuesList:[{}]",
deviceIds, measurementsList, typesList, valuesList);
sessionPool.insertRecords(deviceIds, times, measurementsList, typesList, valuesList); sessionPool.insertRecords(deviceIds, times, measurementsList, typesList, valuesList);
} catch (Exception e) { } catch (Exception e) {
log.error("IotDBSession insertRecords失败: deviceIds={}, times={}, measurementsList={}, typesList=[],valuesList={}, error={}", log.error("IotDBSession insertRecords失败: deviceIds={}, times={}, measurementsList={}, typesList={}, " +
"valuesList={}, error={}",
deviceIds, times, measurementsList, typesList, valuesList, e.getMessage()); deviceIds, times, measurementsList, typesList, valuesList, e.getMessage());
} }
} }
@ -163,13 +169,16 @@ public class IoTDBServiceImpl implements IoTDBService {
* @param valuesList * @param valuesList
*/ */
@Override @Override
public void insertStringRecordsOfOneDevice(String deviceId, List<Long> times, List<List<String>> measurementsList, List<List<String>> valuesList) { public void insertStringRecordsOfOneDevice(String deviceId, List<Long> times, List<List<String>> measurementsList,
List<List<String>> valuesList) {
SessionPool sessionPool = ioTDBSessionConfig.getSessionPool(); SessionPool sessionPool = ioTDBSessionConfig.getSessionPool();
try { try {
log.info("iotdb数据入库deviceId:[{}], measurementsList:[{}], valuesList:[{}]", deviceId, measurementsList, valuesList); log.info("iotdb数据入库deviceId:[{}], measurementsList:[{}], valuesList:[{}]",
deviceId, measurementsList, valuesList);
sessionPool.insertStringRecordsOfOneDevice(deviceId, times, measurementsList, valuesList); sessionPool.insertStringRecordsOfOneDevice(deviceId, times, measurementsList, valuesList);
} catch (Exception e) { } catch (Exception e) {
log.error("IotDBSession insertStringRecordsOfOneDevice失败: deviceId={}, times={}, measurementsList={}, valuesList={}, error={}", log.error("IotDBSession insertStringRecordsOfOneDevice失败: deviceId={}, times={}, " +
"measurementsList={}, valuesList={}, error={}",
deviceId, times, measurementsList, valuesList, e.getMessage()); deviceId, times, measurementsList, valuesList, e.getMessage());
} }
} }
@ -184,13 +193,17 @@ public class IoTDBServiceImpl implements IoTDBService {
* @param valuesList * @param valuesList
*/ */
@Override @Override
public void insertRecordsOfOneDevice(String deviceId, List<Long> times, List<List<String>> measurementsList, List<List<TSDataType>> typesList, List<List<Object>> valuesList) { public void insertRecordsOfOneDevice(String deviceId, List<Long> times, List<List<String>> measurementsList,
List<List<TSDataType>> typesList, List<List<Object>> valuesList) {
SessionPool sessionPool = ioTDBSessionConfig.getSessionPool(); SessionPool sessionPool = ioTDBSessionConfig.getSessionPool();
try { try {
log.info("iotdb数据入库deviceId:[{}], measurementsList:[{}], typesList[{}], valuesList:[{}]", deviceId, measurementsList, typesList, valuesList); log.info("iotdb数据入库deviceId:[{}], measurementsList:[{}], typesList[{}], valuesList:[{}]",
deviceId, measurementsList, typesList, valuesList);
sessionPool.insertRecordsOfOneDevice(deviceId, times, measurementsList, typesList, valuesList); sessionPool.insertRecordsOfOneDevice(deviceId, times, measurementsList, typesList, valuesList);
} catch (Exception e) { } catch (Exception e) {
log.error("IotDBSession insertRecordsOfOneDevice失败: deviceId={}, times={}, measurementsList={}, typesList=[],valuesList={}, error={}", deviceId, times, measurementsList, typesList, valuesList, e.getMessage()); log.error("IotDBSession insertRecordsOfOneDevice失败: deviceId={}, times={}, " +
"measurementsList={},typesList={},valuesList={}, error={}",
deviceId, times, measurementsList, typesList, valuesList, e.getMessage());
} }
} }
@ -242,11 +255,13 @@ public class IoTDBServiceImpl implements IoTDBService {
SessionPool sessionPool = ioTDBSessionConfig.getSessionPool(); SessionPool sessionPool = ioTDBSessionConfig.getSessionPool();
SessionDataSetWrapper sessionDataSetWrapper = null; SessionDataSetWrapper sessionDataSetWrapper = null;
try { try {
log.info("iotdb数据查询paths:[{}], startTime[{}], endTime:[{}],outTime:[{}]", paths, startTime, endTime, outTime); log.info("iotdb数据查询paths:[{}], startTime[{}], endTime:[{}],outTime:[{}]",
paths, startTime, endTime, outTime);
sessionDataSetWrapper = sessionPool.executeRawDataQuery(paths, startTime, endTime, outTime); sessionDataSetWrapper = sessionPool.executeRawDataQuery(paths, startTime, endTime, outTime);
return sessionDataSetWrapper.getSessionDataSet(); return sessionDataSetWrapper.getSessionDataSet();
} catch (Exception e) { } catch (Exception e) {
log.error("IotDBSession executeRawDataQuery失败: paths={}, startTime[{}], endTime:[{}],outTime:[{}],error={}", paths, startTime, endTime, outTime, e.getMessage()); log.error("IotDBSession executeRawDataQuery失败: paths={}, startTime[{}], endTime:[{}], " +
"outTime:[{}], error={}", paths, startTime, endTime, outTime, e.getMessage());
} finally { } finally {
sessionPool.closeResultSet(sessionDataSetWrapper); sessionPool.closeResultSet(sessionDataSetWrapper);
} }
@ -256,23 +271,25 @@ public class IoTDBServiceImpl implements IoTDBService {
/** /**
* *
* *
* @param paths root.ln.wf01.wt01.temperature * @param paths "root.ln.wf01.wt01.temperature"
* @param startTime * @param startTime
* @param endTime * @param endTime
* @param outTime * @param outTime
* @param clazz * @param clazz
* @param <T> * @param <T>
* @return * @return null
*/ */
@Override @Override
public <T> List<T> executeRawDataQuery(List<String> paths, long startTime, long endTime, long outTime, Class<? extends IoTDbRecordAble> clazz) { public <T> List<T> executeRawDataQuery(List<String> paths, long startTime, long endTime, long outTime,
Class<? extends IoTDbRecordAble> clazz) {
SessionDataSet sessionDataSet = executeRawDataQuery(paths, startTime, endTime, outTime); SessionDataSet sessionDataSet = executeRawDataQuery(paths, startTime, endTime, outTime);
List<String> columnNames = sessionDataSet.getColumnNames(); List<String> columnNames = sessionDataSet.getColumnNames();
List<T> resultEntities = null; List<T> resultEntities = null;
try { try {
resultEntities = packagingObjectData(sessionDataSet, columnNames, clazz); resultEntities = packagingObjectData(sessionDataSet, columnNames, clazz);
} catch (Exception e) { } catch (Exception e) {
log.error("IotDBSession executeRawDataQuery失败: paths={}, startTime[{}], endTime:[{}],outTime:[{}],error={}", paths, startTime, endTime, outTime, e.getMessage()); log.error("IotDBSession executeRawDataQuery失败: paths={}, startTime[{}], endTime:[{}], " +
"outTime:[{}], error={}", paths, startTime, endTime, outTime, e.getMessage());
} }
return resultEntities; return resultEntities;
} }
@ -293,7 +310,8 @@ public class IoTDBServiceImpl implements IoTDBService {
sessionDataSetWrapper = sessionPool.executeLastDataQuery(paths, lastTime); sessionDataSetWrapper = sessionPool.executeLastDataQuery(paths, lastTime);
return sessionDataSetWrapper.getSessionDataSet(); return sessionDataSetWrapper.getSessionDataSet();
} catch (Exception e) { } catch (Exception e) {
log.error("IotDBSession executeLastDataQuery失败: paths={}, lastTime[{}], error={}", paths, lastTime, e.getMessage()); log.error("IotDBSession executeLastDataQuery失败: paths={}, lastTime[{}], error={}",
paths, lastTime, e.getMessage());
} finally { } finally {
sessionPool.closeResultSet(sessionDataSetWrapper); sessionPool.closeResultSet(sessionDataSetWrapper);
} }
@ -303,11 +321,11 @@ public class IoTDBServiceImpl implements IoTDBService {
/** /**
* () * ()
* *
* @param <T>
* @param paths root.ln.wf01.wt01.temperature * @param paths root.ln.wf01.wt01.temperature
* @param lastTime * @param lastTime
* @param clazz * @param clazz
* @return * @return null
* @param <T>
*/ */
@Override @Override
public <T> List<T> executeLastDataQuery(List<String> paths, long lastTime, Class<? extends IoTDbRecordAble> clazz) { public <T> List<T> executeLastDataQuery(List<String> paths, long lastTime, Class<? extends IoTDbRecordAble> clazz) {
@ -317,7 +335,8 @@ public class IoTDBServiceImpl implements IoTDBService {
try { try {
resultEntities = packagingObjectData(sessionDataSet, columnNames, clazz); resultEntities = packagingObjectData(sessionDataSet, columnNames, clazz);
} catch (Exception e) { } catch (Exception e) {
log.error("IotDBSession executeLastDataQuery失败: paths={}, lastTime[{}], error={}", paths, lastTime, e.getMessage()); log.error("IotDBSession executeLastDataQuery失败: paths={}, lastTime[{}], error={}",
paths, lastTime, e.getMessage());
} }
return resultEntities; return resultEntities;
} }
@ -332,15 +351,18 @@ public class IoTDBServiceImpl implements IoTDBService {
* @return SessionDataSet * @return SessionDataSet
*/ */
@Override @Override
public SessionDataSet executeLastDataQueryForOneDevice(String db, String device, List<String> sensors, boolean isLegalPathNodes) { public SessionDataSet executeLastDataQueryForOneDevice(String db, String device, List<String> sensors,
boolean isLegalPathNodes) {
SessionPool sessionPool = ioTDBSessionConfig.getSessionPool(); SessionPool sessionPool = ioTDBSessionConfig.getSessionPool();
SessionDataSetWrapper sessionDataSetWrapper = null; SessionDataSetWrapper sessionDataSetWrapper = null;
try { try {
log.info("iotdb数据查询db:[{}], device:[{}],sensors:[{}], isLegalPathNodes:[{}]", db, device, sensors, isLegalPathNodes); log.info("iotdb数据查询db:[{}], device:[{}],sensors:[{}], isLegalPathNodes:[{}]",
db, device, sensors, isLegalPathNodes);
sessionDataSetWrapper = sessionPool.executeLastDataQueryForOneDevice(db, device, sensors, isLegalPathNodes); sessionDataSetWrapper = sessionPool.executeLastDataQueryForOneDevice(db, device, sensors, isLegalPathNodes);
return sessionDataSetWrapper.getSessionDataSet(); return sessionDataSetWrapper.getSessionDataSet();
} catch (Exception e) { } catch (Exception e) {
log.error("IotDBSession executeLastDataQueryForOneDevice失败: db:[{}], device:[{}],sensors:[{}], isLegalPathNodes:[{}], error={}", db, device, sensors, isLegalPathNodes, e.getMessage()); log.error("IotDBSession executeLastDataQueryForOneDevice失败: db:[{}], device:[{}], sensors:[{}], " +
"isLegalPathNodes:[{}], error={}", db, device, sensors, isLegalPathNodes, e.getMessage());
} finally { } finally {
sessionPool.closeResultSet(sessionDataSetWrapper); sessionPool.closeResultSet(sessionDataSetWrapper);
} }
@ -348,23 +370,28 @@ public class IoTDBServiceImpl implements IoTDBService {
} }
/** /**
*
*
* @param db root.ln.wf01 * @param db root.ln.wf01
* @param device root.ln.wf01.wt01 * @param device root.ln.wf01.wt01
* @param sensors temperaturestatus() * @param sensors temperaturestatus()
* @param isLegalPathNodes true() * @param isLegalPathNodes true()
* @param clazz * @param clazz
* @param <T> * @return null
* @return * @param <T>
*/ */
@Override @Override
public <T> List<T> executeLastDataQueryForOneDevice(String db, String device, List<String> sensors, boolean isLegalPathNodes, Class<? extends IoTDbRecordAble> clazz) { public <T> List<T> executeLastDataQueryForOneDevice(String db, String device, List<String> sensors,
boolean isLegalPathNodes,
Class<? extends IoTDbRecordAble> clazz) {
SessionDataSet sessionDataSet = executeLastDataQueryForOneDevice(db, device, sensors, isLegalPathNodes); SessionDataSet sessionDataSet = executeLastDataQueryForOneDevice(db, device, sensors, isLegalPathNodes);
List<String> columnNames = sessionDataSet.getColumnNames(); List<String> columnNames = sessionDataSet.getColumnNames();
List<T> resultEntities = null; List<T> resultEntities = null;
try { try {
resultEntities = packagingObjectData(sessionDataSet, columnNames, clazz); resultEntities = packagingObjectData(sessionDataSet, columnNames, clazz);
} catch (Exception e) { } catch (Exception e) {
log.error("IotDBSession executeLastDataQueryForOneDevice失败: db:[{}], device:[{}],sensors:[{}], isLegalPathNodes:[{}], error={}", db, device, sensors, isLegalPathNodes, e.getMessage()); log.error("IotDBSession executeLastDataQueryForOneDevice失败: db:[{}], device:[{}],sensors:[{}], " +
"isLegalPathNodes:[{}], error={}", db, device, sensors, isLegalPathNodes, e.getMessage());
} }
return resultEntities; return resultEntities;
} }
@ -385,7 +412,8 @@ public class IoTDBServiceImpl implements IoTDBService {
sessionDataSetWrapper = sessionPool.executeAggregationQuery(paths, aggregations); sessionDataSetWrapper = sessionPool.executeAggregationQuery(paths, aggregations);
return sessionDataSetWrapper.getSessionDataSet(); return sessionDataSetWrapper.getSessionDataSet();
} catch (Exception e) { } catch (Exception e) {
log.error("IotDBSession executeAggregationQuery失败: paths:[{}], aggregations[{}] ,error={}", paths, aggregations, e.getMessage()); log.error("IotDBSession executeAggregationQuery失败: paths:[{}], aggregations[{}] ,error={}",
paths, aggregations, e.getMessage());
} finally { } finally {
sessionPool.closeResultSet(sessionDataSetWrapper); sessionPool.closeResultSet(sessionDataSetWrapper);
} }
@ -402,15 +430,18 @@ public class IoTDBServiceImpl implements IoTDBService {
* @return SessionDataSet * @return SessionDataSet
*/ */
@Override @Override
public SessionDataSet executeAggregationQuery(List<String> paths, List<TAggregationType> aggregations, long startTime, long endTime) { public SessionDataSet executeAggregationQuery(List<String> paths, List<TAggregationType> aggregations,
long startTime, long endTime) {
SessionPool sessionPool = ioTDBSessionConfig.getSessionPool(); SessionPool sessionPool = ioTDBSessionConfig.getSessionPool();
SessionDataSetWrapper sessionDataSetWrapper = null; SessionDataSetWrapper sessionDataSetWrapper = null;
try { try {
log.info("iotdb聚合查询paths:[{}], aggregations[{}],startTime[{}], endTime:[{}]", paths, aggregations, startTime, endTime); log.info("iotdb聚合查询paths:[{}], aggregations[{}],startTime[{}], endTime:[{}]",
paths, aggregations, startTime, endTime);
sessionDataSetWrapper = sessionPool.executeAggregationQuery(paths, aggregations, startTime, endTime); sessionDataSetWrapper = sessionPool.executeAggregationQuery(paths, aggregations, startTime, endTime);
return sessionDataSetWrapper.getSessionDataSet(); return sessionDataSetWrapper.getSessionDataSet();
} catch (Exception e) { } catch (Exception e) {
log.error("IotDBSession executeAggregationQuery失败: paths:[{}], aggregations[{}] ,startTime[{}], endTime:[{}],error={}", paths, aggregations, startTime, endTime, e.getMessage()); log.error("IotDBSession executeAggregationQuery失败: paths:[{}], aggregations[{}], " +
"startTime[{}], endTime:[{}],error={}", paths, aggregations, startTime, endTime, e.getMessage());
} finally { } finally {
sessionPool.closeResultSet(sessionDataSetWrapper); sessionPool.closeResultSet(sessionDataSetWrapper);
} }
@ -424,19 +455,25 @@ public class IoTDBServiceImpl implements IoTDBService {
* @param aggregations TAggregationType.SUM,TAggregationType.COUNT * @param aggregations TAggregationType.SUM,TAggregationType.COUNT
* @param startTime () * @param startTime ()
* @param endTime * @param endTime
* @param interval * @param interval
* @return SessionDataSet * @return SessionDataSet
*/ */
@Override @Override
public SessionDataSet executeAggregationQuery(List<String> paths, List<TAggregationType> aggregations, long startTime, long endTime, long interval) { public SessionDataSet executeAggregationQuery(List<String> paths, List<TAggregationType> aggregations,
long startTime, long endTime, long interval) {
SessionPool sessionPool = ioTDBSessionConfig.getSessionPool(); SessionPool sessionPool = ioTDBSessionConfig.getSessionPool();
SessionDataSetWrapper sessionDataSetWrapper = null; SessionDataSetWrapper sessionDataSetWrapper = null;
try { try {
log.info("iotdb聚合查询paths:[{}], aggregations[{}],startTime[{}], endTime:[{}] ,interval:[{}]", paths, aggregations, startTime, endTime, interval); log.info("iotdb聚合查询paths:[{}], aggregations[{}],startTime[{}], endTime:[{}] ,interval:[{}]",
sessionDataSetWrapper = sessionPool.executeAggregationQuery(paths, aggregations, startTime, endTime, interval); paths, aggregations, startTime, endTime, interval);
sessionDataSetWrapper = sessionPool.executeAggregationQuery(
paths, aggregations, startTime, endTime, interval
);
return sessionDataSetWrapper.getSessionDataSet(); return sessionDataSetWrapper.getSessionDataSet();
} catch (Exception e) { } catch (Exception e) {
log.error("IotDBSession executeAggregationQuery失败: paths:[{}], aggregations[{}] ,startTime[{}], endTime:[{}], interval:[{}], error={}", paths, aggregations, startTime, endTime, interval, e.getMessage()); log.error("IotDBSession executeAggregationQuery失败: paths:[{}], aggregations[{}] , " +
"startTime[{}], endTime:[{}], interval:[{}], error={}",
paths, aggregations, startTime, endTime, interval, e.getMessage());
} finally { } finally {
sessionPool.closeResultSet(sessionDataSetWrapper); sessionPool.closeResultSet(sessionDataSetWrapper);
} }
@ -450,21 +487,26 @@ public class IoTDBServiceImpl implements IoTDBService {
* @param aggregations TAggregationType.SUM,TAggregationType.COUNT * @param aggregations TAggregationType.SUM,TAggregationType.COUNT
* @param startTime () * @param startTime ()
* @param endTime * @param endTime
* @param interval * @param interval
* @param slidingStep * @param slidingStep
* @return SessionDataSet * @return SessionDataSet
*/ */
@Override @Override
public SessionDataSet executeAggregationQuery(List<String> paths, List<TAggregationType> aggregations, long startTime, long endTime, long interval, long slidingStep) { public SessionDataSet executeAggregationQuery(List<String> paths, List<TAggregationType> aggregations,
long startTime, long endTime, long interval, long slidingStep) {
SessionPool sessionPool = ioTDBSessionConfig.getSessionPool(); SessionPool sessionPool = ioTDBSessionConfig.getSessionPool();
SessionDataSetWrapper sessionDataSetWrapper = null; SessionDataSetWrapper sessionDataSetWrapper = null;
try { try {
log.info("iotdb聚合查询paths:[{}], aggregations[{}],startTime[{}], endTime:[{}] ,interval:[{}], slidingStep:[{}]", paths, aggregations, startTime, endTime, interval, slidingStep); log.info("iotdb聚合查询paths:[{}], aggregations[{}],startTime[{}], endTime:[{}] ,interval:[{}], " +
sessionDataSetWrapper = sessionPool.executeAggregationQuery(paths, aggregations, startTime, endTime, interval, slidingStep); "slidingStep:[{}]", paths, aggregations, startTime, endTime, interval, slidingStep);
sessionDataSetWrapper = sessionPool.executeAggregationQuery(paths, aggregations, startTime, endTime,
interval, slidingStep);
return sessionDataSetWrapper.getSessionDataSet(); return sessionDataSetWrapper.getSessionDataSet();
} catch (Exception e) { } catch (Exception e) {
log.error("IotDBSession executeAggregationQuery失败: paths:[{}], aggregations[{}] ,startTime[{}], endTime:[{}], interval:[{}], slidingStep:[{}] ,error={}", paths, aggregations, startTime, endTime, interval, slidingStep, e.getMessage()); log.error("IotDBSession executeAggregationQuery失败: paths:[{}], aggregations[{}] , " +
"startTime[{}], endTime:[{}], interval:[{}], slidingStep:[{}] ,error={}",
paths, aggregations, startTime, endTime, interval, slidingStep, e.getMessage());
} finally { } finally {
sessionPool.closeResultSet(sessionDataSetWrapper); sessionPool.closeResultSet(sessionDataSetWrapper);
} }
@ -474,8 +516,8 @@ public class IoTDBServiceImpl implements IoTDBService {
/** /**
* SQL * SQL
* *
* @param sql * @param sql SQLIotDB
* @return * @return SessionDataSet null
*/ */
@Override @Override
public SessionDataSet executeQueryStatement(String sql) { public SessionDataSet executeQueryStatement(String sql) {
@ -497,7 +539,7 @@ public class IoTDBServiceImpl implements IoTDBService {
/** /**
* SQL * SQL
* *
* @param sql * @param sql SQLIotDB
*/ */
@Override @Override
public void executeNonQueryStatement(String sql) { public void executeNonQueryStatement(String sql) {
@ -513,8 +555,9 @@ public class IoTDBServiceImpl implements IoTDBService {
/** /**
* *
* *
* @param sessionDataSet * @param sessionDataSet SessionDataSet
* @param titleList * @param titleList
* @return Map
*/ */
@SneakyThrows @SneakyThrows
@Override @Override
@ -527,14 +570,16 @@ public class IoTDBServiceImpl implements IoTDBService {
Map<String, Object> resultMap = new HashMap<>(); Map<String, Object> resultMap = new HashMap<>();
RowRecord next = sessionDataSet.next(); RowRecord next = sessionDataSet.next();
List<Field> fields = next.getFields(); List<Field> fields = next.getFields();
String timeString = new java.text.SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(next.getTimestamp()); String timeString = new java.text.SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
.format(next.getTimestamp());
resultMap.put("time", timeString); resultMap.put("time", timeString);
for (int i = 0; i < fields.size(); i++) { for (int i = 0; i < fields.size(); i++) {
Field field = fields.get(i); Field field = fields.get(i);
if (field.getDataType() == null || field.getObjectValue(field.getDataType()) == null) { if (field.getDataType() == null || field.getObjectValue(field.getDataType()) == null) {
resultMap.put(splitString(titleList.get(i)), null); resultMap.put(splitString(titleList.get(i)), null);
} else { } else {
resultMap.put(splitString(titleList.get(i)), field.getObjectValue(field.getDataType()).toString()); resultMap.put(splitString(titleList.get(i)),
field.getObjectValue(field.getDataType()).toString());
} }
} }
resultList.add(resultMap); resultList.add(resultMap);
@ -549,12 +594,13 @@ public class IoTDBServiceImpl implements IoTDBService {
* @param sessionDataSet * @param sessionDataSet
* @param titleList * @param titleList
* @param clazz * @param clazz
* @param <T> * @return
* @return * @param <T>
*/ */
@SneakyThrows @SneakyThrows
@Override @Override
public <T> List<T> packagingObjectData(SessionDataSet sessionDataSet, List<String> titleList, Class<? extends IoTDbRecordAble> clazz) { public <T> List<T> packagingObjectData(SessionDataSet sessionDataSet, List<String> titleList,
Class<? extends IoTDbRecordAble> clazz) {
int fetchSize = sessionDataSet.getFetchSize(); int fetchSize = sessionDataSet.getFetchSize();
List<T> resultList = new ArrayList<>(); List<T> resultList = new ArrayList<>();
titleList.remove("Time"); titleList.remove("Time");
@ -563,7 +609,8 @@ public class IoTDBServiceImpl implements IoTDBService {
Map<String, Object> resultMap = new HashMap<>(); Map<String, Object> resultMap = new HashMap<>();
RowRecord next = sessionDataSet.next(); RowRecord next = sessionDataSet.next();
List<Field> fields = next.getFields(); List<Field> fields = next.getFields();
String timeString = new java.text.SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(next.getTimestamp()); String timeString = new java.text.SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
.format(next.getTimestamp());
resultMap.put("time", timeString); resultMap.put("time", timeString);
if (titleList.stream().anyMatch(str -> str.contains("."))) { if (titleList.stream().anyMatch(str -> str.contains("."))) {
for (int i = 0; i < fields.size(); i++) { for (int i = 0; i < fields.size(); i++) {
@ -581,7 +628,10 @@ public class IoTDBServiceImpl implements IoTDBService {
Field fieldDataType = fields.get(2); Field fieldDataType = fields.get(2);
if (fieldName.getDataType() != null && fieldName.getObjectValue(fieldName.getDataType()) != null) { if (fieldName.getDataType() != null && fieldName.getObjectValue(fieldName.getDataType()) != null) {
String mapKey = fieldName.getObjectValue(fieldName.getDataType()).toString(); String mapKey = fieldName.getObjectValue(fieldName.getDataType()).toString();
Object mapValue = convertStringToType(fieldValue.getObjectValue(fieldValue.getDataType()).toString(), fieldDataType.getObjectValue(fieldDataType.getDataType()).toString()); Object mapValue = convertStringToType(
fieldValue.getObjectValue(fieldValue.getDataType()).toString(),
fieldDataType.getObjectValue(fieldDataType.getDataType()).toString()
);
resultMap.put(splitString(mapKey), mapValue); resultMap.put(splitString(mapKey), mapValue);
} }
} }
@ -596,7 +646,7 @@ public class IoTDBServiceImpl implements IoTDBService {
/** /**
* *
* *
* @param str * @param str
* @return * @return
*/ */
public static String splitString(String str) { public static String splitString(String str) {
@ -667,8 +717,8 @@ public class IoTDBServiceImpl implements IoTDBService {
/** /**
* MeasurementSchemas * MeasurementSchemas
* *
* @param obj * @param obj
* @return * @return MeasurementSchema
*/ */
@Override @Override
public List<MeasurementSchema> buildMeasurementSchemas(Object obj) { public List<MeasurementSchema> buildMeasurementSchemas(Object obj) {
@ -685,8 +735,8 @@ public class IoTDBServiceImpl implements IoTDBService {
/** /**
* MeasurementSchemaValuesDTO * MeasurementSchemaValuesDTO
* *
* @param obj * @param obj
* @return * @return MeasurementSchemaValuesDTO
*/ */
@SneakyThrows @SneakyThrows
@Override @Override
@ -698,7 +748,8 @@ public class IoTDBServiceImpl implements IoTDBService {
List<Integer> valuesIsNullIndex = new ArrayList<>(); List<Integer> valuesIsNullIndex = new ArrayList<>();
int valueIndex = 0; int valueIndex = 0;
for (java.lang.reflect.Field field : fields) { for (java.lang.reflect.Field field : fields) {
MeasurementSchema measurementSchema = new MeasurementSchema(field.getName(), getTsDataTypeByString(field.getType().getName())); MeasurementSchema measurementSchema = new MeasurementSchema(field.getName(),
getTsDataTypeByString(field.getType().getName()));
schemaList.add(measurementSchema); schemaList.add(measurementSchema);
Object value = field.get(obj); Object value = field.get(obj);
if (value == null) { if (value == null) {

View File

@ -1,4 +1,4 @@
package com.muyu.data.process.domain; package com.muyu.data.process.domain.dto;
import lombok.AllArgsConstructor; import lombok.AllArgsConstructor;
import lombok.Builder; import lombok.Builder;