feat(): 新增IoTDB配置类、基础业务类

dev.vehicleGateway
xinzirun 2024-09-29 00:55:53 +08:00
parent 38a096ea72
commit 47567ff055
27 changed files with 1534 additions and 9 deletions

View File

@ -6,7 +6,7 @@ nacos:
addr: 106.15.136.7:8848 addr: 106.15.136.7:8848
user-name: nacos user-name: nacos
password: nacos password: nacos
namespace: dev namespace: xzr
# Spring # Spring
spring: spring:
application: application:

View File

@ -6,7 +6,7 @@ nacos:
addr: 106.15.136.7:8848 addr: 106.15.136.7:8848
user-name: nacos user-name: nacos
password: nacos password: nacos
namespace: dev namespace: xzr
# Spring # Spring
spring: spring:

View File

@ -0,0 +1,91 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>com.muyu</groupId>
<artifactId>cloud-modules</artifactId>
<version>3.6.3</version>
</parent>
<artifactId>cloud-modules-data-process</artifactId>
<properties>
<maven.compiler.source>17</maven.compiler.source>
<maven.compiler.target>17</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<description>
cloud-modules-data-process 数据处理
</description>
<dependencies>
<!-- SpringCloud Alibaba Nacos -->
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
</dependency>
<!-- SpringCloud Alibaba Nacos Config -->
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-nacos-config</artifactId>
</dependency>
<!-- SpringCloud Alibaba Sentinel -->
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-sentinel</artifactId>
</dependency>
<!-- SpringBoot Actuator -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<!-- Mysql Connector -->
<dependency>
<groupId>com.mysql</groupId>
<artifactId>mysql-connector-j</artifactId>
</dependency>
<!-- MuYu Common DataSource -->
<dependency>
<groupId>com.muyu</groupId>
<artifactId>cloud-common-datasource</artifactId>
</dependency>
<!-- MuYu Common DataScope -->
<dependency>
<groupId>com.muyu</groupId>
<artifactId>cloud-common-datascope</artifactId>
</dependency>
<!-- MuYu Common Log -->
<dependency>
<groupId>com.muyu</groupId>
<artifactId>cloud-common-log</artifactId>
</dependency>
<!-- 接口模块 -->
<dependency>
<groupId>com.muyu</groupId>
<artifactId>cloud-common-api-doc</artifactId>
</dependency>
<!-- 公共核心模块 -->
<dependency>
<groupId>com.muyu</groupId>
<artifactId>cloud-common-core</artifactId>
</dependency>
<!-- IotDB会话 -->
<dependency>
<groupId>org.apache.iotdb</groupId>
<artifactId>iotdb-session</artifactId>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,20 @@
package com.muyu.data.process;
import com.muyu.common.security.annotation.EnableCustomConfig;
import com.muyu.common.security.annotation.EnableMyFeignClients;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
/**
* @Author: zi run
* @Date 2024/9/28 22:31
* @Description
*/
@EnableCustomConfig
@EnableMyFeignClients
@SpringBootApplication
public class CloudDataProcessApplication {
public static void main(String[] args) {
SpringApplication.run(CloudDataProcessApplication.class, args);
}
}

View File

@ -0,0 +1,72 @@
package com.muyu.data.process.basic.config;
import lombok.extern.slf4j.Slf4j;
import org.apache.iotdb.session.pool.SessionPool;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Configuration;
import org.springframework.stereotype.Component;
import java.util.List;
/**
* @Author: zi run
* @Date 2024/9/28 22:41
* @Description IoTDB
*/
@Slf4j
@Component
@Configuration
public class IoTDBSessionConfig {
@Value("${spring.iotdb.username}")
private String username;
@Value("${spring.iotdb.password}")
private String password;
@Value("${spring.iotdb.ip}")
private String ip;
@Value("${spring.iotdb.port}")
private int port;
@Value("${spring.iotdb.maxSize}")
private int maxSize;
/**
* IoTDB
*/
private static SessionPool sessionPool = null;
/**
* IoTDB
* @return ioTDB
*/
public SessionPool getSessionPool() {
if (sessionPool == null) {
sessionPool = new SessionPool(ip, port, username, password, maxSize);
}
return sessionPool;
}
/**
* IoTDB
*
* @param deviceId
* @param time
* @param measurements
* @param values
*
* <p> IoTDB
* 便</p>
*/
public void insertRecord(String deviceId, long time, List<String> measurements, List<String> values) {
getSessionPool();
try {
log.info("iotdb数据入库device_id:[{}], measurements:[{}], values:[{}]", deviceId, measurements, values);
sessionPool.insertRecord(deviceId, time, measurements, values);
} catch (Exception e) {
log.error("IotDBSession insertRecord失败: deviceId={}, time={}, measurements={}, values={}, error={}",
deviceId, time, measurements, values, e.getMessage());
}
}
}

View File

@ -0,0 +1,110 @@
package com.muyu.data.process.basic.service;
import com.muyu.data.process.domain.dto.IoTDbRecordAble;
import com.muyu.data.process.domain.dto.MeasurementSchemaValuesDTO;
import org.apache.iotdb.common.rpc.thrift.TAggregationType;
import org.apache.iotdb.isession.SessionDataSet;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.write.record.Tablet;
import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
import java.util.List;
import java.util.Map;
/**
* @Author: zi run
* @Date 2024/9/28 23:37
* @Description IoTDB
*/
public interface IoTDBService {
/**
* Tablet IoTDB
*
* @param tablet Tablet
*/
void insertTablet(Tablet tablet);
void insertTablets(Map<String, Tablet> tablets);
void insertStringRecord(String deviceId, long time, List<String> measurements, List<String> values);
void insertRecord(String deviceId, long time, List<String> measurements, List<TSDataType> types, List<Object> values);
void insertStringRecords(List<String> deviceIds, List<Long> times, List<List<String>> measurementsList, List<List<String>> valuesList);
void insertRecords(List<String> deviceIds, List<Long> times, List<List<String>> measurementsList, List<List<TSDataType>> typesList, List<List<Object>> valuesList);
void insertStringRecordsOfOneDevice(String deviceId, List<Long> times, List<List<String>> measurementsList, List<List<String>> valuesList);
void insertRecordsOfOneDevice(String deviceId, List<Long> times, List<List<String>> measurementsList, List<List<TSDataType>> typesList, List<List<Object>> valuesList);
void deleteData(String path, long endTime);
void deleteData(List<String> paths, long endTime);
SessionDataSet executeRawDataQuery(List<String> paths, long startTime, long endTime, long timeOut);
<T> List<T> executeRawDataQuery(List<String> paths, long startTime, long endTime, long timeOut, Class<? extends IoTDbRecordAble> clazz);
SessionDataSet executeLastDataQuery(List<String> paths, long lastTime);
<T> List<T> executeLastDataQuery(List<String> paths, long lastTime, Class<? extends IoTDbRecordAble> clazz);
SessionDataSet executeLastDataQueryForOneDevice(String db, String device, List<String> sensors, boolean isLegalPathNodes);
<T> List<T> executeLastDataQueryForOneDevice(String db, String device, List<String> sensors, boolean isLegalPathNodes, Class<? extends IoTDbRecordAble> clazz);
SessionDataSet executeAggregationQuery(List<String> paths, List<TAggregationType> aggregations);
SessionDataSet executeAggregationQuery(List<String> paths, List<TAggregationType> aggregations, long startTime, long endTime);
SessionDataSet executeAggregationQuery(List<String> paths, List<TAggregationType> aggregations, long startTime, long endTime, long interval);
SessionDataSet executeAggregationQuery(List<String> paths, List<TAggregationType> aggregations, long startTime, long endTime, long interval, long slidingStep);
SessionDataSet executeQueryStatement(String sql);
/**
* SQL
*
* @param sql
*/
void executeNonQueryStatement(String sql);
/**
*
*
* @param sessionDataSet
* @param columnNames
*/
List<Map<String, Object>> packagingMapData(SessionDataSet sessionDataSet, List<String> columnNames);
/**
*
*
* @param sessionDataSet
* @param titleList
* @param clazz
* @param <T>
* @return
*/
<T> List<T> packagingObjectData(SessionDataSet sessionDataSet, List<String> titleList, Class<? extends IoTDbRecordAble> clazz);
/**
* MeasurementSchemas
*
* @param object
* @return
*/
List<MeasurementSchema> buildMeasurementSchemas(Object object);
/**
* MeasurementSchemaValuesDTO
*
* @param object
* @return
*/
MeasurementSchemaValuesDTO buildMeasurementSchemasAndValues(Object object);
}

View File

@ -0,0 +1,714 @@
package com.muyu.data.process.basic.service.impl;
import com.alibaba.fastjson.JSON;
import com.muyu.data.process.basic.config.IoTDBSessionConfig;
import com.muyu.data.process.basic.service.IoTDBService;
import com.muyu.data.process.domain.dto.IoTDbRecordAble;
import com.muyu.data.process.domain.dto.MeasurementSchemaValuesDTO;
import lombok.RequiredArgsConstructor;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.iotdb.common.rpc.thrift.TAggregationType;
import org.apache.iotdb.isession.pool.SessionDataSetWrapper;
import org.apache.iotdb.session.pool.SessionPool;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.common.Field;
import org.apache.iotdb.tsfile.read.common.RowRecord;
import org.apache.iotdb.tsfile.write.record.Tablet;
import org.apache.iotdb.isession.SessionDataSet;
import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
import org.springframework.stereotype.Service;
import java.lang.reflect.Type;
import java.util.*;
import java.util.stream.Collectors;
/**
* @Author: zi run
* @Date 2024/9/28 23:38
* @Description IoTDB
*/
@Slf4j
@Service
@RequiredArgsConstructor
public class IoTDBServiceImpl implements IoTDBService {
/**
* IoTDB
*/
private final IoTDBSessionConfig ioTDBSessionConfig;
/**
* Tablet IoTDB
*
* @param tablet Tablet
*/
@Override
public void insertTablet(Tablet tablet) {
SessionPool sessionPool = ioTDBSessionConfig.getSessionPool();
try {
log.info("iotdb数据入库tablet:[{}]", tablet);
sessionPool.insertTablet(tablet);
} catch (Exception e) {
log.error("IotDBSession insertTablet失败: tablet={}, error={}", tablet, e.getMessage());
}
}
/**
* Tablets IoTDB
*
* @param tablets Map Tablets String Tablet
* Tablets IoTDB
*/
@Override
public void insertTablets(Map<String, Tablet> tablets) {
SessionPool sessionPool = ioTDBSessionConfig.getSessionPool();
try {
log.info("iotdb数据入库tablets:[{}]", tablets);
sessionPool.insertTablets(tablets);
} catch (Exception e) {
log.error("IotDBSession insertTablets失败: tablets={}, error={}", tablets, e.getMessage());
}
}
/**
* string
*
* @param deviceId root.ln.wf01.wt01
* @param time
* @param measurements
* @param values
*/
@Override
public void insertStringRecord(String deviceId, long time, List<String> measurements, List<String> values) {
SessionPool sessionPool = ioTDBSessionConfig.getSessionPool();
try {
log.info("iotdb数据入库device_id:[{}], measurements:[{}], values:[{}]", deviceId, measurements, values);
sessionPool.insertRecord(deviceId, time, measurements, values);
} catch (Exception e) {
log.error("IotDBSession insertRecord失败: deviceId={}, time={}, measurements={}, values={}, error={}",
deviceId, time, measurements, values, e.getMessage());
}
}
/**
*
*
* @param deviceId root.ln.wf01.wt01
* @param time
* @param measurements
* @param types
* @param values
*/
@Override
public void insertRecord(String deviceId, long time, List<String> measurements, List<TSDataType> types, List<Object> values) {
SessionPool sessionPool = ioTDBSessionConfig.getSessionPool();
try {
log.info("iotdb数据入库device_id:[{}], measurements:[{}], types:[{}], values:[{}]", deviceId, measurements, types, values);
sessionPool.insertRecord(deviceId, time, measurements, types, values);
} catch (Exception e) {
log.error("IotDBSession insertRecordHasTypes失败: deviceId={}, time={}, measurements={},types={}, values={}, error={}",
deviceId, time, measurements, types, values, e.getMessage());
}
}
/**
* string
*
* @param deviceIds root.ln.wf01.wt01
* @param times
* @param measurementsList
* @param valuesList
*/
@Override
public void insertStringRecords(List<String> deviceIds, List<Long> times, List<List<String>> measurementsList, List<List<String>> valuesList) {
SessionPool sessionPool = ioTDBSessionConfig.getSessionPool();
try {
log.info("iotdb数据入库deviceIds:[{}], measurementsList:[{}], valuesList:[{}]", deviceIds, measurementsList, valuesList);
sessionPool.insertRecords(deviceIds, times, measurementsList, valuesList);
} catch (Exception e) {
log.error("IotDBSession insertRecords失败: deviceIds={}, times={}, measurementsList={}, valuesList={}, error={}",
deviceIds, times, measurementsList, valuesList, e.getMessage());
}
}
/**
*
*
* @param deviceIds root.ln.wf01.wt01
* @param times
* @param measurementsList
* @param typesList
* @param valuesList
*/
@Override
public void insertRecords(List<String> deviceIds, List<Long> times, List<List<String>> measurementsList, List<List<TSDataType>> typesList, List<List<Object>> valuesList) {
SessionPool sessionPool = ioTDBSessionConfig.getSessionPool();
try {
log.info("iotdb数据入库deviceIds:[{}], measurementsList:[{}], typesList[{}], valuesList:[{}]", deviceIds, measurementsList, typesList, valuesList);
sessionPool.insertRecords(deviceIds, times, measurementsList, typesList, valuesList);
} catch (Exception e) {
log.error("IotDBSession insertRecords失败: deviceIds={}, times={}, measurementsList={}, typesList=[],valuesList={}, error={}",
deviceIds, times, measurementsList, typesList, valuesList, e.getMessage());
}
}
/**
* string
*
* @param deviceId root.ln.wf01.wt01
* @param times
* @param measurementsList
* @param valuesList
*/
@Override
public void insertStringRecordsOfOneDevice(String deviceId, List<Long> times, List<List<String>> measurementsList, List<List<String>> valuesList) {
SessionPool sessionPool = ioTDBSessionConfig.getSessionPool();
try {
log.info("iotdb数据入库deviceId:[{}], measurementsList:[{}], valuesList:[{}]", deviceId, measurementsList, valuesList);
sessionPool.insertStringRecordsOfOneDevice(deviceId, times, measurementsList, valuesList);
} catch (Exception e) {
log.error("IotDBSession insertStringRecordsOfOneDevice失败: deviceId={}, times={}, measurementsList={}, valuesList={}, error={}",
deviceId, times, measurementsList, valuesList, e.getMessage());
}
}
/**
*
*
* @param deviceId root.ln.wf01.wt01
* @param times
* @param measurementsList
* @param typesList
* @param valuesList
*/
@Override
public void insertRecordsOfOneDevice(String deviceId, List<Long> times, List<List<String>> measurementsList, List<List<TSDataType>> typesList, List<List<Object>> valuesList) {
SessionPool sessionPool = ioTDBSessionConfig.getSessionPool();
try {
log.info("iotdb数据入库deviceId:[{}], measurementsList:[{}], typesList[{}], valuesList:[{}]", deviceId, measurementsList, typesList, valuesList);
sessionPool.insertRecordsOfOneDevice(deviceId, times, measurementsList, typesList, valuesList);
} catch (Exception e) {
log.error("IotDBSession insertRecordsOfOneDevice失败: deviceId={}, times={}, measurementsList={}, typesList=[],valuesList={}, error={}", deviceId, times, measurementsList, typesList, valuesList, e.getMessage());
}
}
/**
*
*
* @param path root.ln.wf01.wt01.temperature
* @param endTime
*/
@Override
public void deleteData(String path, long endTime) {
SessionPool sessionPool = ioTDBSessionConfig.getSessionPool();
try {
log.info("iotdb数据删除path:[{}], endTime:[{}]", path, endTime);
sessionPool.deleteData(path, endTime);
} catch (Exception e) {
log.error("IotDBSession deleteData失败: deviceId={}, times={},error={}", path, endTime, e.getMessage());
}
}
/**
*
*
* @param paths root.ln.wf01.wt01.temperature
* @param endTime
*/
@Override
public void deleteData(List<String> paths, long endTime) {
SessionPool sessionPool = ioTDBSessionConfig.getSessionPool();
try {
log.info("iotdb数据删除paths:[{}], endTime:[{}]", paths, endTime);
sessionPool.deleteData(paths, endTime);
} catch (Exception e) {
log.error("IotDBSession deleteData失败: paths={}, times={},error={}", paths, endTime, e.getMessage());
}
}
/**
*
*
* @param paths root.ln.wf01.wt01.temperature
* @param startTime
* @param endTime
* @param outTime
* @return SessionDataSet (Time,paths)
*/
@Override
public SessionDataSet executeRawDataQuery(List<String> paths, long startTime, long endTime, long outTime) {
SessionPool sessionPool = ioTDBSessionConfig.getSessionPool();
SessionDataSetWrapper sessionDataSetWrapper = null;
try {
log.info("iotdb数据查询paths:[{}], startTime[{}], endTime:[{}],outTime:[{}]", paths, startTime, endTime, outTime);
sessionDataSetWrapper = sessionPool.executeRawDataQuery(paths, startTime, endTime, outTime);
return sessionDataSetWrapper.getSessionDataSet();
} catch (Exception e) {
log.error("IotDBSession executeRawDataQuery失败: paths={}, startTime[{}], endTime:[{}],outTime:[{}],error={}", paths, startTime, endTime, outTime, e.getMessage());
} finally {
sessionPool.closeResultSet(sessionDataSetWrapper);
}
return null;
}
/**
*
*
* @param paths root.ln.wf01.wt01.temperature
* @param startTime
* @param endTime
* @param outTime
* @param clazz
* @param <T>
* @return
*/
@Override
public <T> List<T> executeRawDataQuery(List<String> paths, long startTime, long endTime, long outTime, Class<? extends IoTDbRecordAble> clazz) {
SessionDataSet sessionDataSet = executeRawDataQuery(paths, startTime, endTime, outTime);
List<String> columnNames = sessionDataSet.getColumnNames();
List<T> resultEntities = null;
try {
resultEntities = packagingObjectData(sessionDataSet, columnNames, clazz);
} catch (Exception e) {
log.error("IotDBSession executeRawDataQuery失败: paths={}, startTime[{}], endTime:[{}],outTime:[{}],error={}", paths, startTime, endTime, outTime, e.getMessage());
}
return resultEntities;
}
/**
* ()
*
* @param paths root.ln.wf01.wt01.temperature
* @param lastTime
* @return SessionDataSet
*/
@Override
public SessionDataSet executeLastDataQuery(List<String> paths, long lastTime) {
SessionPool sessionPool = ioTDBSessionConfig.getSessionPool();
SessionDataSetWrapper sessionDataSetWrapper = null;
try {
log.info("iotdb数据查询paths:[{}], lastTime:[{}]", paths, lastTime);
sessionDataSetWrapper = sessionPool.executeLastDataQuery(paths, lastTime);
return sessionDataSetWrapper.getSessionDataSet();
} catch (Exception e) {
log.error("IotDBSession executeLastDataQuery失败: paths={}, lastTime[{}], error={}", paths, lastTime, e.getMessage());
} finally {
sessionPool.closeResultSet(sessionDataSetWrapper);
}
return null;
}
/**
* ()
*
* @param <T>
* @param paths root.ln.wf01.wt01.temperature
* @param lastTime
* @param clazz
* @return
*/
@Override
public <T> List<T> executeLastDataQuery(List<String> paths, long lastTime, Class<? extends IoTDbRecordAble> clazz) {
SessionDataSet sessionDataSet = executeLastDataQuery(paths, lastTime);
List<String> columnNames = sessionDataSet.getColumnNames();
List<T> resultEntities = null;
try {
resultEntities = packagingObjectData(sessionDataSet, columnNames, clazz);
} catch (Exception e) {
log.error("IotDBSession executeLastDataQuery失败: paths={}, lastTime[{}], error={}", paths, lastTime, e.getMessage());
}
return resultEntities;
}
/**
*
*
* @param db root.ln.wf01
* @param device root.ln.wf01.wt01
* @param sensors temperaturestatus()
* @param isLegalPathNodes true()
* @return SessionDataSet
*/
@Override
public SessionDataSet executeLastDataQueryForOneDevice(String db, String device, List<String> sensors, boolean isLegalPathNodes) {
SessionPool sessionPool = ioTDBSessionConfig.getSessionPool();
SessionDataSetWrapper sessionDataSetWrapper = null;
try {
log.info("iotdb数据查询db:[{}], device:[{}],sensors:[{}], isLegalPathNodes:[{}]", db, device, sensors, isLegalPathNodes);
sessionDataSetWrapper = sessionPool.executeLastDataQueryForOneDevice(db, device, sensors, isLegalPathNodes);
return sessionDataSetWrapper.getSessionDataSet();
} catch (Exception e) {
log.error("IotDBSession executeLastDataQueryForOneDevice失败: db:[{}], device:[{}],sensors:[{}], isLegalPathNodes:[{}], error={}", db, device, sensors, isLegalPathNodes, e.getMessage());
} finally {
sessionPool.closeResultSet(sessionDataSetWrapper);
}
return null;
}
/**
* @param db root.ln.wf01
* @param device root.ln.wf01.wt01
* @param sensors temperaturestatus()
* @param isLegalPathNodes true()
* @param clazz
* @param <T>
* @return
*/
@Override
public <T> List<T> executeLastDataQueryForOneDevice(String db, String device, List<String> sensors, boolean isLegalPathNodes, Class<? extends IoTDbRecordAble> clazz) {
SessionDataSet sessionDataSet = executeLastDataQueryForOneDevice(db, device, sensors, isLegalPathNodes);
List<String> columnNames = sessionDataSet.getColumnNames();
List<T> resultEntities = null;
try {
resultEntities = packagingObjectData(sessionDataSet, columnNames, clazz);
} catch (Exception e) {
log.error("IotDBSession executeLastDataQueryForOneDevice失败: db:[{}], device:[{}],sensors:[{}], isLegalPathNodes:[{}], error={}", db, device, sensors, isLegalPathNodes, e.getMessage());
}
return resultEntities;
}
/**
*
*
* @param paths root.ln.wf01.wt01.temperature
* @param aggregations TAggregationType.SUM,TAggregationType.COUNT
* @return SessionDataSet
*/
@Override
public SessionDataSet executeAggregationQuery(List<String> paths, List<TAggregationType> aggregations) {
SessionPool sessionPool = ioTDBSessionConfig.getSessionPool();
SessionDataSetWrapper sessionDataSetWrapper = null;
try {
log.info("iotdb聚合查询paths:[{}], aggregations[{}]", paths, aggregations);
sessionDataSetWrapper = sessionPool.executeAggregationQuery(paths, aggregations);
return sessionDataSetWrapper.getSessionDataSet();
} catch (Exception e) {
log.error("IotDBSession executeAggregationQuery失败: paths:[{}], aggregations[{}] ,error={}", paths, aggregations, e.getMessage());
} finally {
sessionPool.closeResultSet(sessionDataSetWrapper);
}
return null;
}
/**
* ()
*
* @param paths root.ln.wf01.wt01.temperature
* @param aggregations TAggregationType.SUM,TAggregationType.COUNT
* @param startTime ()
* @param endTime
* @return SessionDataSet
*/
@Override
public SessionDataSet executeAggregationQuery(List<String> paths, List<TAggregationType> aggregations, long startTime, long endTime) {
SessionPool sessionPool = ioTDBSessionConfig.getSessionPool();
SessionDataSetWrapper sessionDataSetWrapper = null;
try {
log.info("iotdb聚合查询paths:[{}], aggregations[{}],startTime[{}], endTime:[{}]", paths, aggregations, startTime, endTime);
sessionDataSetWrapper = sessionPool.executeAggregationQuery(paths, aggregations, startTime, endTime);
return sessionDataSetWrapper.getSessionDataSet();
} catch (Exception e) {
log.error("IotDBSession executeAggregationQuery失败: paths:[{}], aggregations[{}] ,startTime[{}], endTime:[{}],error={}", paths, aggregations, startTime, endTime, e.getMessage());
} finally {
sessionPool.closeResultSet(sessionDataSetWrapper);
}
return null;
}
/**
* ()
*
* @param paths root.ln.wf01.wt01.temperature
* @param aggregations TAggregationType.SUM,TAggregationType.COUNT
* @param startTime ()
* @param endTime
* @param interval
* @return SessionDataSet
*/
@Override
public SessionDataSet executeAggregationQuery(List<String> paths, List<TAggregationType> aggregations, long startTime, long endTime, long interval) {
SessionPool sessionPool = ioTDBSessionConfig.getSessionPool();
SessionDataSetWrapper sessionDataSetWrapper = null;
try {
log.info("iotdb聚合查询paths:[{}], aggregations[{}],startTime[{}], endTime:[{}] ,interval:[{}]", paths, aggregations, startTime, endTime, interval);
sessionDataSetWrapper = sessionPool.executeAggregationQuery(paths, aggregations, startTime, endTime, interval);
return sessionDataSetWrapper.getSessionDataSet();
} catch (Exception e) {
log.error("IotDBSession executeAggregationQuery失败: paths:[{}], aggregations[{}] ,startTime[{}], endTime:[{}], interval:[{}], error={}", paths, aggregations, startTime, endTime, interval, e.getMessage());
} finally {
sessionPool.closeResultSet(sessionDataSetWrapper);
}
return null;
}
/**
* ()
*
* @param paths root.ln.wf01.wt01.temperature
* @param aggregations TAggregationType.SUM,TAggregationType.COUNT
* @param startTime ()
* @param endTime
* @param interval
* @param slidingStep
* @return SessionDataSet
*/
@Override
public SessionDataSet executeAggregationQuery(List<String> paths, List<TAggregationType> aggregations, long startTime, long endTime, long interval, long slidingStep) {
SessionPool sessionPool = ioTDBSessionConfig.getSessionPool();
SessionDataSetWrapper sessionDataSetWrapper = null;
try {
log.info("iotdb聚合查询paths:[{}], aggregations[{}],startTime[{}], endTime:[{}] ,interval:[{}], slidingStep:[{}]", paths, aggregations, startTime, endTime, interval, slidingStep);
sessionDataSetWrapper = sessionPool.executeAggregationQuery(paths, aggregations, startTime, endTime, interval, slidingStep);
return sessionDataSetWrapper.getSessionDataSet();
} catch (Exception e) {
log.error("IotDBSession executeAggregationQuery失败: paths:[{}], aggregations[{}] ,startTime[{}], endTime:[{}], interval:[{}], slidingStep:[{}] ,error={}", paths, aggregations, startTime, endTime, interval, slidingStep, e.getMessage());
} finally {
sessionPool.closeResultSet(sessionDataSetWrapper);
}
return null;
}
/**
* SQL
*
* @param sql
* @return
*/
@Override
public SessionDataSet executeQueryStatement(String sql) {
SessionPool sessionPool = ioTDBSessionConfig.getSessionPool();
SessionDataSetWrapper sessionDataSetWrapper = null;
try {
log.info("iotdb SQL查询sql:[{}]", sql);
sessionDataSetWrapper = sessionPool.executeQueryStatement(sql);
return sessionDataSetWrapper.getSessionDataSet();
} catch (Exception e) {
log.error("IotDBSession executeQueryStatement失败:sql:[{}],error={}", sql, e.getMessage());
} finally {
sessionPool.closeResultSet(sessionDataSetWrapper);
}
return null;
}
/**
* SQL
*
* @param sql
*/
@Override
public void executeNonQueryStatement(String sql) {
SessionPool sessionPool = ioTDBSessionConfig.getSessionPool();
try {
log.info("iotdb SQL无查询sql:[{}]", sql);
sessionPool.executeNonQueryStatement(sql);
} catch (Exception e) {
log.error("IotDBSession executeNonQueryStatement失败:sql:[{}],error={}", sql, e.getMessage());
}
}
/**
*
*
* @param sessionDataSet
* @param titleList
*/
@SneakyThrows
@Override
public List<Map<String, Object>> packagingMapData(SessionDataSet sessionDataSet, List<String> titleList) {
int fetchSize = sessionDataSet.getFetchSize();
List<Map<String, Object>> resultList = new ArrayList<>();
titleList.remove("Time");
if (fetchSize > 0) {
while (sessionDataSet.hasNext()) {
Map<String, Object> resultMap = new HashMap<>();
RowRecord next = sessionDataSet.next();
List<Field> fields = next.getFields();
String timeString = new java.text.SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(next.getTimestamp());
resultMap.put("time", timeString);
for (int i = 0; i < fields.size(); i++) {
Field field = fields.get(i);
if (field.getDataType() == null || field.getObjectValue(field.getDataType()) == null) {
resultMap.put(splitString(titleList.get(i)), null);
} else {
resultMap.put(splitString(titleList.get(i)), field.getObjectValue(field.getDataType()).toString());
}
}
resultList.add(resultMap);
}
}
return resultList;
}
/**
*
*
* @param sessionDataSet
* @param titleList
* @param clazz
* @param <T>
* @return
*/
@SneakyThrows
@Override
public <T> List<T> packagingObjectData(SessionDataSet sessionDataSet, List<String> titleList, Class<? extends IoTDbRecordAble> clazz) {
int fetchSize = sessionDataSet.getFetchSize();
List<T> resultList = new ArrayList<>();
titleList.remove("Time");
if (fetchSize > 0) {
while (sessionDataSet.hasNext()) {
Map<String, Object> resultMap = new HashMap<>();
RowRecord next = sessionDataSet.next();
List<Field> fields = next.getFields();
String timeString = new java.text.SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(next.getTimestamp());
resultMap.put("time", timeString);
if (titleList.stream().anyMatch(str -> str.contains("."))) {
for (int i = 0; i < fields.size(); i++) {
Field field = fields.get(i);
String title = titleList.get(i);
if (field.getDataType() == null || field.getObjectValue(field.getDataType()) == null) {
resultMap.put(splitString(title), null);
} else {
resultMap.put(splitString(title), field.getObjectValue(field.getDataType()).toString());
}
}
} else {
Field fieldName = fields.get(0);
Field fieldValue = fields.get(1);
Field fieldDataType = fields.get(2);
if (fieldName.getDataType() != null && fieldName.getObjectValue(fieldName.getDataType()) != null) {
String mapKey = fieldName.getObjectValue(fieldName.getDataType()).toString();
Object mapValue = convertStringToType(fieldValue.getObjectValue(fieldValue.getDataType()).toString(), fieldDataType.getObjectValue(fieldDataType.getDataType()).toString());
resultMap.put(splitString(mapKey), mapValue);
}
}
String jsonString = JSON.toJSONString(resultMap);
resultList.add(JSON.parseObject(jsonString, (Type) clazz));
}
}
return resultList;
}
/**
*
*
* @param str
* @return
*/
public static String splitString(String str) {
String[] parts = str.split("\\.");
if (parts.length <= 0) {
return str;
} else {
return parts[parts.length - 1];
}
}
/**
*
*
* @param value
* @param typeName
* @return
*/
public static Object convertStringToType(String value, String typeName) {
String type = typeName.toLowerCase();
if (type.isEmpty()) {
return value;
}
if ("boolean".equals(type)) {
return Boolean.parseBoolean(value);
} else if ("double".equals(type)) {
return Double.parseDouble(value);
} else if ("int32".equals(type)) {
return Integer.parseInt(value);
} else if ("int64".equals(type)) {
return Long.parseLong(value);
} else if ("float".equals(type)) {
return Float.parseFloat(value);
} else if ("text".equals(type)) {
return value;
} else {
return value;
}
}
/**
* TSDataType
*
* @param type
* @return TSDataType
*/
public static TSDataType getTsDataTypeByString(String type) {
String typeName = splitString(type).toLowerCase();
if ("boolean".equals(typeName)) {
return TSDataType.BOOLEAN;
} else if ("double".equals(typeName)) {
return TSDataType.DOUBLE;
} else if ("int".equals(typeName) || "integer".equals(typeName)) {
return TSDataType.INT32;
} else if ("long".equals(typeName)) {
return TSDataType.INT64;
} else if ("float".equals(typeName)) {
return TSDataType.FLOAT;
} else if ("text".equals(typeName)) {
return TSDataType.TEXT;
} else if ("string".equals(typeName)) {
return TSDataType.TEXT;
} else {
return TSDataType.UNKNOWN;
}
}
/**
* MeasurementSchemas
*
* @param obj
* @return
*/
@Override
public List<MeasurementSchema> buildMeasurementSchemas(Object obj) {
java.lang.reflect.Field[] fields = obj.getClass().getDeclaredFields();
List<MeasurementSchema> schemaList = Arrays.stream(fields).map(field ->
new MeasurementSchema(field.getName(),
getTsDataTypeByString(
field.getType().getName()
))).
collect(Collectors.toList());
return schemaList;
}
/**
* MeasurementSchemaValuesDTO
*
* @param obj
* @return
*/
@SneakyThrows
@Override
public MeasurementSchemaValuesDTO buildMeasurementSchemasAndValues(Object obj) {
MeasurementSchemaValuesDTO measurementSchemaValuesDTO = new MeasurementSchemaValuesDTO();
java.lang.reflect.Field[] fields = obj.getClass().getDeclaredFields();
List<MeasurementSchema> schemaList = new ArrayList<>();
List<Object> values = new ArrayList<>();
List<Integer> valuesIsNullIndex = new ArrayList<>();
int valueIndex = 0;
for (java.lang.reflect.Field field : fields) {
MeasurementSchema measurementSchema = new MeasurementSchema(field.getName(), getTsDataTypeByString(field.getType().getName()));
schemaList.add(measurementSchema);
Object value = field.get(obj);
if (value == null) {
valuesIsNullIndex.add(valueIndex);
}
values.add(value);
valueIndex++;
}
measurementSchemaValuesDTO.setSchemaList(schemaList);
measurementSchemaValuesDTO.setValues(values);
return measurementSchemaValuesDTO;
}
}

View File

@ -0,0 +1,33 @@
package com.muyu.data.process.domain;
import io.swagger.v3.oas.annotations.media.Schema;
import io.swagger.v3.oas.annotations.tags.Tag;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.experimental.SuperBuilder;
/**
* @Author: zi run
* @Date 2024/9/29 0:11
* @Description JSON
*/
@Data
@SuperBuilder
@AllArgsConstructor
@NoArgsConstructor
@Tag(name = "ionDB数据源对象")
public class DataJSON {
/**
*
*/
@Schema(name = "时间戳")
private Long timestamp;
/**
* JSON
*/
@Schema(name = "车辆JSON数据")
private String datasource;
}

View File

@ -0,0 +1,67 @@
package com.muyu.data.process.domain;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.util.ArrayList;
import java.util.List;
/**
* @Author: zi run
* @Date 2024/9/29 0:12
* @Description
*/
@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
public class InsertDataDTO {
/**
*
*/
private Float temperature;
/**
*
*/
private String hardware;
/**
*
*/
private Boolean status;
/**
* InsertDataDTO
*
* @return InsertDataDTO
*/
public InsertDataDTO buildOne() {
InsertDataDTO insertDataDTO = new InsertDataDTO();
insertDataDTO.setHardware("ss");
insertDataDTO.setStatus(true);
insertDataDTO.setTemperature(12.0F);
return insertDataDTO;
}
/**
* InsertDataDTO
*
* @return InsertDataDTO
*/
public List<InsertDataDTO> buildList() {
List<InsertDataDTO> insertDataDTOS = new ArrayList<>();
int buildNum = 10;
for (int i = 0; i < buildNum; i++) {
InsertDataDTO insertDataDTO = new InsertDataDTO();
insertDataDTO.setHardware(i % 2 == 0 ? "pp" + i : null);
insertDataDTO.setStatus(i % 2 == 0);
insertDataDTO.setTemperature(12.0F + i);
insertDataDTOS.add(insertDataDTO);
}
return insertDataDTOS;
}
}

View File

@ -0,0 +1,36 @@
package com.muyu.data.process.domain;
import com.muyu.data.process.domain.dto.IoTDbRecordAble;
import lombok.Data;
import lombok.EqualsAndHashCode;
/**
* @Author: zi run
* @Date 2024/9/29 0:18
* @Description
*/
@Data
@EqualsAndHashCode(callSuper = true)
public class ResultEntity extends IoTDbRecordAble {
/**
*
*/
private Float temperature;
/**
*
*/
private String hardware;
/**
*
*/
private Boolean status;
/**
*
*/
private String time;
}

View File

@ -0,0 +1,43 @@
package com.muyu.data.process.domain;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
/**
* @Author: zi run
* @Date 2024/9/29 0:22
* @Description
*/
@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
public class TestDataType {
/**
*
*/
private Float temperature;
/**
*
*/
private String hardware;
/**
*
*/
private Boolean status;
/**
* Double
*/
private Double testDouble;
/**
* Long
*/
private Long testLong;
}

View File

@ -0,0 +1,12 @@
package com.muyu.data.process.domain.dto;
import lombok.Data;
/**
* @Author: zi run
* @Date 2024/9/29 0:23
* @Description IoTDB
*/
@Data
public class IoTDbRecordAble {
}

View File

@ -0,0 +1,36 @@
package com.muyu.data.process.domain.dto;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.experimental.SuperBuilder;
import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
import java.util.List;
/**
* @Author: zi run
* @Date 2024/9/29 0:26
* @Description
*/
@Data
@SuperBuilder
@AllArgsConstructor
@NoArgsConstructor
public class MeasurementSchemaValuesDTO {
/**
*
*/
private List<MeasurementSchema> schemaList;
/**
* schemaList
*/
private List<Object> values;
/**
*
*/
private List<Integer> valueIsNullIndex;
}

View File

@ -0,0 +1,2 @@
Spring Boot Version: ${spring-boot.version}
Spring Application Name: ${spring.application.name}

View File

@ -0,0 +1,45 @@
# Tomcat
server:
port: 11000
nacos:
addr: 106.15.136.7:8848
user-name: nacos
password: nacos
namespace: xzr
spring:
application:
# 应用名称
name: cloud-data-process
profiles:
# 环境配置
active: dev
cloud:
nacos:
discovery:
# 服务注册地址
server-addr: ${nacos.addr}
# nacos用户名
username: ${nacos.user-name}
# nacos密码
password: ${nacos.password}
# 命名空间
namespace: ${nacos.namespace}
config:
# 服务注册地址
server-addr: ${nacos.addr}
# nacos用户名
username: ${nacos.user-name}
# nacos密码
password: ${nacos.password}
# 命名空间
namespace: ${nacos.namespace}
# 配置文件格式
file-extension: yml
# 共享配置
shared-configs:
# 系统共享配置
- application-${spring.profiles.active}.${spring.cloud.nacos.config.file-extension}
# 系统环境Config共享配置
- application-config-${spring.profiles.active}.${spring.cloud.nacos.config.file-extension}

View File

@ -0,0 +1,74 @@
<?xml version="1.0" encoding="UTF-8"?>
<configuration scan="true" scanPeriod="60 seconds" debug="false">
<!-- 日志存放路径 -->
<property name="log.path" value="logs/cloud-data-process"/>
<!-- 日志输出格式 -->
<property name="log.pattern" value="%d{HH:mm:ss.SSS} [%thread] %-5level %logger{20} - [%method,%line] - %msg%n"/>
<!-- 控制台输出 -->
<appender name="console" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>${log.pattern}</pattern>
</encoder>
</appender>
<!-- 系统日志输出 -->
<appender name="file_info" class="ch.qos.logback.core.rolling.RollingFileAppender">
<file>${log.path}/info.log</file>
<!-- 循环政策:基于时间创建日志文件 -->
<rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
<!-- 日志文件名格式 -->
<fileNamePattern>${log.path}/info.%d{yyyy-MM-dd}.log</fileNamePattern>
<!-- 日志最大的历史 60天 -->
<maxHistory>60</maxHistory>
</rollingPolicy>
<encoder>
<pattern>${log.pattern}</pattern>
</encoder>
<filter class="ch.qos.logback.classic.filter.LevelFilter">
<!-- 过滤的级别 -->
<level>INFO</level>
<!-- 匹配时的操作:接收(记录) -->
<onMatch>ACCEPT</onMatch>
<!-- 不匹配时的操作:拒绝(不记录) -->
<onMismatch>DENY</onMismatch>
</filter>
</appender>
<appender name="file_error" class="ch.qos.logback.core.rolling.RollingFileAppender">
<file>${log.path}/error.log</file>
<!-- 循环政策:基于时间创建日志文件 -->
<rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
<!-- 日志文件名格式 -->
<fileNamePattern>${log.path}/error.%d{yyyy-MM-dd}.log</fileNamePattern>
<!-- 日志最大的历史 60天 -->
<maxHistory>60</maxHistory>
</rollingPolicy>
<encoder>
<pattern>${log.pattern}</pattern>
</encoder>
<filter class="ch.qos.logback.classic.filter.LevelFilter">
<!-- 过滤的级别 -->
<level>ERROR</level>
<!-- 匹配时的操作:接收(记录) -->
<onMatch>ACCEPT</onMatch>
<!-- 不匹配时的操作:拒绝(不记录) -->
<onMismatch>DENY</onMismatch>
</filter>
</appender>
<!-- 系统模块日志级别控制 -->
<logger name="com.muyu" level="info"/>
<!-- Spring日志级别控制 -->
<logger name="org.springframework" level="warn"/>
<root level="info">
<appender-ref ref="console"/>
</root>
<!--系统操作日志-->
<root level="info">
<appender-ref ref="file_info"/>
<appender-ref ref="file_error"/>
</root>
</configuration>

View File

@ -0,0 +1,81 @@
<?xml version="1.0" encoding="UTF-8"?>
<configuration scan="true" scanPeriod="60 seconds" debug="false">
<!-- 日志存放路径 -->
<property name="log.path" value="logs/cloud-data-process"/>
<!-- 日志输出格式 -->
<property name="log.pattern" value="%d{HH:mm:ss.SSS} [%thread] %-5level %logger{20} - [%method,%line] - %msg%n"/>
<property name="log.sky.pattern" value="%d{HH:mm:ss.SSS} %yellow([%tid]) [%thread] %-5level %logger{20} - [%method,%line] - %msg%n"/>
<!-- 控制台输出 -->
<appender name="console" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>${log.sky.pattern}</pattern>
</encoder>
</appender>
<!-- 系统日志输出 -->
<appender name="file_info" class="ch.qos.logback.core.rolling.RollingFileAppender">
<file>${log.path}/info.log</file>
<!-- 循环政策:基于时间创建日志文件 -->
<rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
<!-- 日志文件名格式 -->
<fileNamePattern>${log.path}/info.%d{yyyy-MM-dd}.log</fileNamePattern>
<!-- 日志最大的历史 60天 -->
<maxHistory>60</maxHistory>
</rollingPolicy>
<filter class="ch.qos.logback.classic.filter.LevelFilter">
<!-- 过滤的级别 -->
<level>INFO</level>
<!-- 匹配时的操作:接收(记录) -->
<onMatch>ACCEPT</onMatch>
<!-- 不匹配时的操作:拒绝(不记录) -->
<onMismatch>DENY</onMismatch>
</filter>
</appender>
<appender name="file_error" class="ch.qos.logback.core.rolling.RollingFileAppender">
<file>${log.path}/error.log</file>
<!-- 循环政策:基于时间创建日志文件 -->
<rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
<!-- 日志文件名格式 -->
<fileNamePattern>${log.path}/error.%d{yyyy-MM-dd}.log</fileNamePattern>
<!-- 日志最大的历史 60天 -->
<maxHistory>60</maxHistory>
</rollingPolicy>
<filter class="ch.qos.logback.classic.filter.LevelFilter">
<!-- 过滤的级别 -->
<level>ERROR</level>
<!-- 匹配时的操作:接收(记录) -->
<onMatch>ACCEPT</onMatch>
<!-- 不匹配时的操作:拒绝(不记录) -->
<onMismatch>DENY</onMismatch>
</filter>
</appender>
<!-- 使用gRpc将日志发送到skywalking服务端 -->
<appender name="GRPC_LOG" class="org.apache.skywalking.apm.toolkit.log.logback.v1.x.log.GRPCLogClientAppender">
<encoder class="ch.qos.logback.core.encoder.LayoutWrappingEncoder">
<layout class="org.apache.skywalking.apm.toolkit.log.logback.v1.x.TraceIdPatternLogbackLayout">
<Pattern>${log.sky.pattern}</Pattern>
</layout>
</encoder>
</appender>
<!-- 系统模块日志级别控制 -->
<logger name="com.muyu" level="info"/>
<!-- Spring日志级别控制 -->
<logger name="org.springframework" level="warn"/>
<root level="info">
<appender-ref ref="GRPC_LOG"/>
<appender-ref ref="console"/>
</root>
<!--系统操作日志-->
<root level="info">
<appender-ref ref="file_info"/>
<appender-ref ref="file_error"/>
</root>
</configuration>

View File

@ -0,0 +1,81 @@
<?xml version="1.0" encoding="UTF-8"?>
<configuration scan="true" scanPeriod="60 seconds" debug="false">
<!-- 日志存放路径 -->
<property name="log.path" value="logs/cloud-data-process"/>
<!-- 日志输出格式 -->
<property name="log.pattern" value="%d{HH:mm:ss.SSS} [%thread] %-5level %logger{20} - [%method,%line] - %msg%n"/>
<property name="log.sky.pattern" value="%d{HH:mm:ss.SSS} %yellow([%tid]) [%thread] %-5level %logger{20} - [%method,%line] - %msg%n"/>
<!-- 控制台输出 -->
<appender name="console" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>${log.sky.pattern}</pattern>
</encoder>
</appender>
<!-- 系统日志输出 -->
<appender name="file_info" class="ch.qos.logback.core.rolling.RollingFileAppender">
<file>${log.path}/info.log</file>
<!-- 循环政策:基于时间创建日志文件 -->
<rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
<!-- 日志文件名格式 -->
<fileNamePattern>${log.path}/info.%d{yyyy-MM-dd}.log</fileNamePattern>
<!-- 日志最大的历史 60天 -->
<maxHistory>60</maxHistory>
</rollingPolicy>
<filter class="ch.qos.logback.classic.filter.LevelFilter">
<!-- 过滤的级别 -->
<level>INFO</level>
<!-- 匹配时的操作:接收(记录) -->
<onMatch>ACCEPT</onMatch>
<!-- 不匹配时的操作:拒绝(不记录) -->
<onMismatch>DENY</onMismatch>
</filter>
</appender>
<appender name="file_error" class="ch.qos.logback.core.rolling.RollingFileAppender">
<file>${log.path}/error.log</file>
<!-- 循环政策:基于时间创建日志文件 -->
<rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
<!-- 日志文件名格式 -->
<fileNamePattern>${log.path}/error.%d{yyyy-MM-dd}.log</fileNamePattern>
<!-- 日志最大的历史 60天 -->
<maxHistory>60</maxHistory>
</rollingPolicy>
<filter class="ch.qos.logback.classic.filter.LevelFilter">
<!-- 过滤的级别 -->
<level>ERROR</level>
<!-- 匹配时的操作:接收(记录) -->
<onMatch>ACCEPT</onMatch>
<!-- 不匹配时的操作:拒绝(不记录) -->
<onMismatch>DENY</onMismatch>
</filter>
</appender>
<!-- 使用gRpc将日志发送到skywalking服务端 -->
<appender name="GRPC_LOG" class="org.apache.skywalking.apm.toolkit.log.logback.v1.x.log.GRPCLogClientAppender">
<encoder class="ch.qos.logback.core.encoder.LayoutWrappingEncoder">
<layout class="org.apache.skywalking.apm.toolkit.log.logback.v1.x.TraceIdPatternLogbackLayout">
<Pattern>${log.sky.pattern}</Pattern>
</layout>
</encoder>
</appender>
<!-- 系统模块日志级别控制 -->
<logger name="com.muyu" level="info"/>
<!-- Spring日志级别控制 -->
<logger name="org.springframework" level="warn"/>
<root level="info">
<appender-ref ref="GRPC_LOG"/>
<appender-ref ref="console"/>
</root>
<!--系统操作日志-->
<root level="info">
<appender-ref ref="file_info"/>
<appender-ref ref="file_error"/>
</root>
</configuration>

View File

@ -22,7 +22,6 @@
</description> </description>
<dependencies> <dependencies>
<!-- SpringCloud Alibaba Nacos --> <!-- SpringCloud Alibaba Nacos -->
<dependency> <dependency>
<groupId>com.alibaba.cloud</groupId> <groupId>com.alibaba.cloud</groupId>

View File

@ -6,7 +6,7 @@ import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.boot.autoconfigure.SpringBootApplication;
/** /**
* *
* *
* @author muyu * @author muyu
*/ */

View File

@ -6,7 +6,7 @@ nacos:
addr: 106.15.136.7:8848 addr: 106.15.136.7:8848
user-name: nacos user-name: nacos
password: nacos password: nacos
namespace: dev namespace: xzr
spring: spring:
application: application:

View File

@ -6,7 +6,7 @@ nacos:
addr: 106.15.136.7:8848 addr: 106.15.136.7:8848
user-name: nacos user-name: nacos
password: nacos password: nacos
namespace: dev namespace: xzr
# Spring # Spring
spring: spring:

View File

@ -6,7 +6,7 @@ nacos:
addr: 106.15.136.7:8848 addr: 106.15.136.7:8848
user-name: nacos user-name: nacos
password: nacos password: nacos
namespace: dev namespace: xzr
# Spring # Spring
spring: spring:

View File

@ -6,7 +6,7 @@ nacos:
addr: 106.15.136.7:8848 addr: 106.15.136.7:8848
user-name: nacos user-name: nacos
password: nacos password: nacos
namespace: dev namespace: xzr
spring: spring:
application: application:

View File

@ -14,6 +14,7 @@
<module>cloud-modules-file</module> <module>cloud-modules-file</module>
<module>cloud-modules-enterprise</module> <module>cloud-modules-enterprise</module>
<module>cloud-weixin-mp</module> <module>cloud-weixin-mp</module>
<module>cloud-modules-data-process</module>
</modules> </modules>
<artifactId>cloud-modules</artifactId> <artifactId>cloud-modules</artifactId>

View File

@ -6,7 +6,7 @@ nacos:
addr: 106.15.136.7:8848 addr: 106.15.136.7:8848
user-name: nacos user-name: nacos
password: nacos password: nacos
namespace: dev namespace: xzr
# Spring # Spring
spring: spring:

View File

@ -44,6 +44,7 @@
<xxl-job-core.version>2.4.1</xxl-job-core.version> <xxl-job-core.version>2.4.1</xxl-job-core.version>
<swagger.an.jakarta.verison>2.2.8</swagger.an.jakarta.verison> <swagger.an.jakarta.verison>2.2.8</swagger.an.jakarta.verison>
<kafka.clients.verison>3.0.0</kafka.clients.verison> <kafka.clients.verison>3.0.0</kafka.clients.verison>
<iotdb-session.verison>1.3.1</iotdb-session.verison>
</properties> </properties>
<!-- 依赖声明 --> <!-- 依赖声明 -->
@ -199,6 +200,13 @@
<version>${kafka.clients.verison}</version> <version>${kafka.clients.verison}</version>
</dependency> </dependency>
<!-- IotDB会话-->
<dependency>
<groupId>org.apache.iotdb</groupId>
<artifactId>iotdb-session</artifactId>
<version>${iotdb-session.verison}</version>
</dependency>
<!-- 核心模块 --> <!-- 核心模块 -->
<dependency> <dependency>
<groupId>com.muyu</groupId> <groupId>com.muyu</groupId>