Compare commits
8 Commits
63ebaee731
...
1d43fa7888
Author | SHA1 | Date |
---|---|---|
|
1d43fa7888 | |
|
61dc7f9277 | |
|
6eff930c84 | |
|
ecf81cc79e | |
|
dd94f2e39e | |
|
2084013449 | |
|
3ac5b84ef6 | |
|
1e19d99748 |
|
@ -7,13 +7,13 @@ import java.util.concurrent.TimeUnit;
|
|||
|
||||
/**
|
||||
* 缓存基础
|
||||
* * @className: CacheBasic ️✈️
|
||||
* * @author: Yang 鹏 🦅
|
||||
* * @date: 2024/9/29 16:08 ⏰
|
||||
* * @Version: 1.0
|
||||
* * @description:
|
||||
* * @className: CacheBasic ️✈️
|
||||
* * @author: Yang 鹏 🦅
|
||||
* * @date: 2024/9/29 16:08 ⏰
|
||||
* * @Version: 1.0
|
||||
* * @description:
|
||||
*/
|
||||
public interface CacheBasic<K,V> extends PrimaryKeyBasic<K>{
|
||||
public interface CacheBasic<K, V> extends PrimaryKeyBasic<K> {
|
||||
void put(K key, V value);
|
||||
|
||||
V get(K key);
|
||||
|
|
|
@ -0,0 +1,31 @@
|
|||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<project xmlns="http://maven.apache.org/POM/4.0.0"
|
||||
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
<parent>
|
||||
<groupId>com.muyu</groupId>
|
||||
<artifactId>cloud-common</artifactId>
|
||||
<version>3.6.3</version>
|
||||
</parent>
|
||||
|
||||
<artifactId>cloud-common-caffeine</artifactId>
|
||||
|
||||
<description>
|
||||
cloud-common-caffeine 本地缓存服务
|
||||
</description>
|
||||
|
||||
<properties>
|
||||
<maven.compiler.source>17</maven.compiler.source>
|
||||
<maven.compiler.target>17</maven.compiler.target>
|
||||
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
|
||||
</properties>
|
||||
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>com.github.ben-manes.caffeine</groupId>
|
||||
<artifactId>caffeine</artifactId>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
|
||||
</project>
|
|
@ -0,0 +1,39 @@
|
|||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<project xmlns="http://maven.apache.org/POM/4.0.0"
|
||||
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
<parent>
|
||||
<groupId>com.muyu</groupId>
|
||||
<artifactId>cloud-common</artifactId>
|
||||
<version>3.6.3</version>
|
||||
</parent>
|
||||
|
||||
<artifactId>cloud-common-iotdb</artifactId>
|
||||
|
||||
<description>
|
||||
cloud-common-iotdb 时序性数据存储服务
|
||||
</description>
|
||||
|
||||
<properties>
|
||||
<maven.compiler.source>17</maven.compiler.source>
|
||||
<maven.compiler.target>17</maven.compiler.target>
|
||||
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
|
||||
</properties>
|
||||
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>org.apache.iotdb</groupId>
|
||||
<artifactId>iotdb-session</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.alibaba.fastjson2</groupId>
|
||||
<artifactId>fastjson2</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.projectlombok</groupId>
|
||||
<artifactId>lombok</artifactId>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
|
||||
</project>
|
|
@ -0,0 +1,149 @@
|
|||
|
||||
package com.muyu.common.iotdb.config;
|
||||
|
||||
import com.alibaba.fastjson2.JSONObject;
|
||||
import lombok.extern.log4j.Log4j2;
|
||||
import org.apache.iotdb.isession.SessionDataSet;
|
||||
import org.apache.iotdb.isession.pool.SessionDataSetWrapper;
|
||||
import org.apache.iotdb.rpc.IoTDBConnectionException;
|
||||
import org.apache.iotdb.rpc.StatementExecutionException;
|
||||
import org.apache.iotdb.session.pool.SessionPool;
|
||||
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
|
||||
import org.apache.iotdb.tsfile.read.common.Field;
|
||||
import org.apache.iotdb.tsfile.read.common.RowRecord;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* @Author WangXin
|
||||
* @Data 2024/9/30
|
||||
* @Description IotDBSessionConfig配置类
|
||||
* @Version 1.0.0
|
||||
*/
|
||||
@Log4j2
|
||||
@Component
|
||||
@Configuration
|
||||
public class IotDBSessionConfig {
|
||||
|
||||
@Value("${spring.iotdb.username:root}")
|
||||
private String username;
|
||||
|
||||
@Value("${spring.iotdb.password:root}")
|
||||
private String password;
|
||||
|
||||
@Value("${spring.iotdb.ip:127.0.0.1}")
|
||||
private String ip;
|
||||
|
||||
@Value("${spring.iotdb.port:6667}")
|
||||
private int port;
|
||||
|
||||
@Value("${spring.iotdb.maxSize:10}")
|
||||
private int maxSize;
|
||||
|
||||
private static SessionPool sessionPool;
|
||||
|
||||
/**
|
||||
* 获取IotDBSession对象
|
||||
* @return iotDBSession对象
|
||||
*/
|
||||
public SessionPool getSessionPool() {
|
||||
if (sessionPool == null) {
|
||||
sessionPool = new SessionPool(ip, port, username, password, maxSize);
|
||||
}
|
||||
return sessionPool;
|
||||
}
|
||||
|
||||
/**
|
||||
* 添加数据
|
||||
* @param deviceId
|
||||
* @param time
|
||||
* @param measurements
|
||||
* @param values
|
||||
*/
|
||||
public void insertRecord(SessionPool sessionPool,String deviceId, long time, List<String> measurements, List<String> values) {
|
||||
try {
|
||||
log.info("iotdb数据入库:device_id:[{}], measurements:[{}], values:[{}]", deviceId, measurements, values);
|
||||
sessionPool.insertRecord(deviceId, time, measurements, values);
|
||||
} catch (Exception e) {
|
||||
log.error("IotDBSession insertRecord失败: deviceId={}, time={}, measurements={}, values={}, error={}",
|
||||
deviceId, time, measurements, values, e.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
public SessionDataSet selectRecord(SessionPool sessionPool,String sql) {
|
||||
log.info("iotdb数据查询:sql:[{}]",sql);
|
||||
SessionDataSetWrapper sessionDataSetWrapper = null;
|
||||
try {
|
||||
log.info("iotdb SQL查询:sql:[{}]", sql);
|
||||
sessionDataSetWrapper = sessionPool.executeQueryStatement(sql);
|
||||
return sessionDataSetWrapper.getSessionDataSet();
|
||||
} catch (Exception e) {
|
||||
log.error("IotDBSession executeQueryStatement失败:sql:[{}],error={}", sql, e.getMessage());
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
||||
public static void main(String[] args) {
|
||||
SessionPool sessionPool = new SessionPool("127.0.0.1", 6667, "root", "root", 10);
|
||||
String ROOT_DATA_DATAJSON = "root.car.data.datajson";
|
||||
String SELECT_ROOT_DATA_DATAJSON_DATASOURCE = "select * from root.car.data.datajson";
|
||||
|
||||
String jsonValue = """
|
||||
{
|
||||
"name": "张三",
|
||||
"age": 28,
|
||||
"email": "zhangsan@example.com",
|
||||
"isStudent": false,
|
||||
"hobbies": ["阅读", "旅行", "编程"],
|
||||
"address": {
|
||||
"street": "长安街100号",
|
||||
"city": "北京",
|
||||
"postalCode": "100000"
|
||||
}
|
||||
}
|
||||
""";
|
||||
|
||||
IotDBSessionConfig iotDBSessionConfig = new IotDBSessionConfig();
|
||||
|
||||
List<String> values = new ArrayList<>();
|
||||
values.add(jsonValue);
|
||||
ArrayList<String> objects = new ArrayList<>();
|
||||
objects.add("datasource");
|
||||
iotDBSessionConfig.insertRecord(sessionPool,ROOT_DATA_DATAJSON,System.currentTimeMillis(),objects,values);
|
||||
|
||||
SessionDataSet sessionDataSet = iotDBSessionConfig.selectRecord(sessionPool,SELECT_ROOT_DATA_DATAJSON_DATASOURCE);
|
||||
|
||||
HashMap<Long, Map<String, String>> longMapHashMap = new HashMap<>();
|
||||
|
||||
try {
|
||||
while (sessionDataSet.hasNext()){
|
||||
RowRecord next = sessionDataSet.next();
|
||||
long timestamp = next.getTimestamp();
|
||||
Map<String, String> fieldMap = new HashMap<>();
|
||||
for (Field field : next.getFields()) {
|
||||
TSDataType dataType = field.getDataType();
|
||||
String stringValue = field.getStringValue();
|
||||
fieldMap.put(dataType.name(), stringValue);
|
||||
}
|
||||
longMapHashMap.put(timestamp, fieldMap);
|
||||
}
|
||||
} catch (StatementExecutionException e) {
|
||||
throw new RuntimeException(e);
|
||||
} catch (IoTDBConnectionException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
|
||||
log.info("数据为:{}", JSONObject.toJSONString(longMapHashMap));
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
|
|
@ -0,0 +1,29 @@
|
|||
package com.muyu.common.iotdb.domain;
|
||||
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.Data;
|
||||
import lombok.NoArgsConstructor;
|
||||
import lombok.experimental.SuperBuilder;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* @Author WangXin
|
||||
* @Data 2024/9/29
|
||||
* @Description 事件驱动对象
|
||||
* @Version 1.0.0
|
||||
*/
|
||||
@Data
|
||||
@SuperBuilder
|
||||
@AllArgsConstructor
|
||||
@NoArgsConstructor
|
||||
public class EventActuate {
|
||||
/**
|
||||
* json数据
|
||||
*/
|
||||
private String jsonData;
|
||||
/**
|
||||
* 事件驱动key集合
|
||||
*/
|
||||
private List<String> eventKeys;
|
||||
}
|
|
@ -0,0 +1,40 @@
|
|||
package com.muyu.common.iotdb.domain;
|
||||
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.Builder;
|
||||
import lombok.Data;
|
||||
import lombok.NoArgsConstructor;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
@Data
|
||||
@Builder
|
||||
@AllArgsConstructor
|
||||
@NoArgsConstructor
|
||||
public class InsertDataDTO {
|
||||
private Float temperature;
|
||||
private String hardware;
|
||||
private Boolean status;
|
||||
|
||||
public InsertDataDTO buildOne() {
|
||||
InsertDataDTO insertDataDTO = new InsertDataDTO();
|
||||
insertDataDTO.setHardware("ss");
|
||||
insertDataDTO.setStatus(true);
|
||||
insertDataDTO.setTemperature(12.0F);
|
||||
return insertDataDTO;
|
||||
}
|
||||
|
||||
public List<InsertDataDTO> buildList() {
|
||||
List<InsertDataDTO> insertDataDTOS = new ArrayList<>();
|
||||
int buildNum = 10;
|
||||
for (int i = 0; i < buildNum; i++) {
|
||||
InsertDataDTO insertDataDTO = new InsertDataDTO();
|
||||
insertDataDTO.setHardware(i % 2 == 0 ? "pp" + i : null);
|
||||
insertDataDTO.setStatus(i % 2 == 0);
|
||||
insertDataDTO.setTemperature(12.0F + i);
|
||||
insertDataDTOS.add(insertDataDTO);
|
||||
}
|
||||
return insertDataDTOS;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,20 @@
|
|||
package com.muyu.common.iotdb.domain;
|
||||
|
||||
|
||||
import com.muyu.common.iotdb.domain.dto.IotDbRecordAble;
|
||||
import lombok.Data;
|
||||
import lombok.EqualsAndHashCode;
|
||||
|
||||
@EqualsAndHashCode(callSuper = true)
|
||||
@Data
|
||||
public class ResultEntity extends IotDbRecordAble {
|
||||
|
||||
private Float temperature;
|
||||
|
||||
private String hardware;
|
||||
|
||||
private Boolean status;
|
||||
|
||||
private String time;
|
||||
|
||||
}
|
|
@ -0,0 +1,18 @@
|
|||
package com.muyu.common.iotdb.domain;
|
||||
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.Builder;
|
||||
import lombok.Data;
|
||||
import lombok.NoArgsConstructor;
|
||||
|
||||
@Data
|
||||
@Builder
|
||||
@AllArgsConstructor
|
||||
@NoArgsConstructor
|
||||
public class TestDataType {
|
||||
private Float temperature;
|
||||
private String hardware;
|
||||
private Boolean status;
|
||||
private Double testDouble;
|
||||
private Long testLong;
|
||||
}
|
|
@ -0,0 +1,14 @@
|
|||
package com.muyu.common.iotdb.domain.dto;
|
||||
|
||||
import lombok.Data;
|
||||
|
||||
/**
|
||||
* @Author WangXin
|
||||
* @Data 2024/9/30
|
||||
* @Description IotDBServiceImpl业务实现层
|
||||
* @Version 1.0.0
|
||||
*/
|
||||
|
||||
@Data
|
||||
public class IotDbRecordAble {
|
||||
}
|
|
@ -0,0 +1,23 @@
|
|||
package com.muyu.common.iotdb.domain.dto;
|
||||
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.Data;
|
||||
import lombok.NoArgsConstructor;
|
||||
import lombok.experimental.SuperBuilder;
|
||||
import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
|
||||
@Data
|
||||
@SuperBuilder
|
||||
@AllArgsConstructor
|
||||
@NoArgsConstructor
|
||||
public class MeasurementSchemaValuesDTO {
|
||||
|
||||
private List<MeasurementSchema> schemaList;
|
||||
|
||||
private List<Object> values;
|
||||
|
||||
private List<Integer> valueIsNullIndex;
|
||||
}
|
|
@ -0,0 +1,105 @@
|
|||
package com.muyu.common.iotdb.service;
|
||||
|
||||
import com.muyu.common.iotdb.domain.dto.IotDbRecordAble;
|
||||
import com.muyu.common.iotdb.domain.dto.MeasurementSchemaValuesDTO;
|
||||
import org.apache.iotdb.common.rpc.thrift.TAggregationType;
|
||||
import org.apache.iotdb.isession.SessionDataSet;
|
||||
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
|
||||
import org.apache.iotdb.tsfile.write.record.Tablet;
|
||||
import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* @Author WangXin
|
||||
* @Data 2024/9/28
|
||||
* @Description IotDBServiceImpl业务层
|
||||
* @Version 1.0.0
|
||||
*/
|
||||
public interface IotDBService {
|
||||
|
||||
void insertTablet(Tablet tablet);
|
||||
|
||||
void insertTablets(Map<String, Tablet> tablets);
|
||||
|
||||
void insertStringRecord(String deviceId, long time, List<String> measurements, List<String> values);
|
||||
|
||||
void insertRecord(String deviceId, long time, List<String> measurements, List<TSDataType> types, List<Object> values);
|
||||
|
||||
void insertStringRecords(List<String> deviceIds, List<Long> times, List<List<String>> measurementsList, List<List<String>> valuesList);
|
||||
|
||||
void insertRecords(List<String> deviceIds, List<Long> times, List<List<String>> measurementsList, List<List<TSDataType>> typesList, List<List<Object>> valuesList);
|
||||
|
||||
void insertStringRecordsOfOneDevice(String deviceId, List<Long> times, List<List<String>> measurementsList, List<List<String>> valuesList);
|
||||
|
||||
void insertRecordsOfOneDevice(String deviceId, List<Long> times, List<List<String>> measurementsList, List<List<TSDataType>> typesList, List<List<Object>> valuesList);
|
||||
|
||||
void deleteData(String path, long endTime);
|
||||
|
||||
void deleteData(List<String> paths, long endTime);
|
||||
|
||||
SessionDataSet executeRawDataQuery(List<String> paths, long startTime, long endTime, long timeOut);
|
||||
|
||||
<T> List<T> executeRawDataQuery(List<String> paths, long startTime, long endTime, long timeOut, Class<? extends IotDbRecordAble> clazz);
|
||||
|
||||
SessionDataSet executeLastDataQuery(List<String> paths, long lastTime);
|
||||
|
||||
<T> List<T> executeLastDataQuery(List<String> paths, long lastTime, Class<? extends IotDbRecordAble> clazz);
|
||||
|
||||
SessionDataSet executeLastDataQueryForOneDevice(String db, String device, List<String> sensors, boolean isLegalPathNodes);
|
||||
|
||||
<T> List<T> executeLastDataQueryForOneDevice(String db, String device, List<String> sensors, boolean isLegalPathNodes, Class<? extends IotDbRecordAble> clazz);
|
||||
|
||||
SessionDataSet executeAggregationQuery(List<String> paths, List<TAggregationType> aggregations);
|
||||
|
||||
SessionDataSet executeAggregationQuery(List<String> paths, List<TAggregationType> aggregations, long startTime, long endTime);
|
||||
|
||||
SessionDataSet executeAggregationQuery(List<String> paths, List<TAggregationType> aggregations, long startTime, long endTime, long interval);
|
||||
|
||||
SessionDataSet executeAggregationQuery(List<String> paths, List<TAggregationType> aggregations, long startTime, long endTime, long interval, long slidingStep);
|
||||
|
||||
SessionDataSet executeQueryStatement(String sql);
|
||||
|
||||
|
||||
/**
|
||||
* SQL非查询
|
||||
*
|
||||
* @param sql
|
||||
*/
|
||||
void executeNonQueryStatement(String sql);
|
||||
|
||||
/**
|
||||
* 封装处理数据
|
||||
*
|
||||
* @param sessionDataSet
|
||||
* @param titleList
|
||||
*/
|
||||
List<Map<String, Object>> packagingMapData(SessionDataSet sessionDataSet, List<String> columnNames);
|
||||
|
||||
/**
|
||||
* 封装处理数据(不支持聚合查询)
|
||||
*
|
||||
* @param sessionDataSet 查询返回的结果集
|
||||
* @param titleList 查询返回的结果集内的字段名
|
||||
* @param clazz 返回数据对应的对象(对象属性必须与字段名对应)
|
||||
* @param <T>
|
||||
* @return
|
||||
*/
|
||||
<T> List<T> packagingObjectData(SessionDataSet sessionDataSet, List<String> titleList, Class<? extends IotDbRecordAble> clazz);
|
||||
|
||||
/**
|
||||
* 根据对象构建MeasurementSchemas
|
||||
*
|
||||
* @param object 对象
|
||||
* @return
|
||||
*/
|
||||
List<MeasurementSchema> buildMeasurementSchemas(Object object);
|
||||
/**
|
||||
* 根据对象构建MeasurementSchemaValuesDTO
|
||||
*
|
||||
* @param object 对象
|
||||
* @return
|
||||
*/
|
||||
MeasurementSchemaValuesDTO buildMeasurementSchemasAndValues(Object object);
|
||||
}
|
|
@ -0,0 +1,711 @@
|
|||
package com.muyu.common.iotdb.service.impl;
|
||||
|
||||
import com.alibaba.fastjson2.JSON;
|
||||
import com.muyu.common.iotdb.config.IotDBSessionConfig;
|
||||
import com.muyu.common.iotdb.domain.dto.IotDbRecordAble;
|
||||
import com.muyu.common.iotdb.domain.dto.MeasurementSchemaValuesDTO;
|
||||
import com.muyu.common.iotdb.service.IotDBService;
|
||||
import jakarta.annotation.Resource;
|
||||
import lombok.SneakyThrows;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.iotdb.common.rpc.thrift.TAggregationType;
|
||||
import org.apache.iotdb.isession.SessionDataSet;
|
||||
import org.apache.iotdb.isession.pool.SessionDataSetWrapper;
|
||||
import org.apache.iotdb.session.pool.SessionPool;
|
||||
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
|
||||
import org.apache.iotdb.tsfile.read.common.Field;
|
||||
import org.apache.iotdb.tsfile.read.common.RowRecord;
|
||||
import org.apache.iotdb.tsfile.write.record.Tablet;
|
||||
import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
import java.lang.reflect.Type;
|
||||
import java.util.*;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
/**
|
||||
* @Author WangXin
|
||||
* @Data 2024/9/28
|
||||
* @Description IotDBServiceImpl业务实现层
|
||||
* @Version 1.0.0
|
||||
*/
|
||||
@Service
|
||||
@Slf4j
|
||||
public class IotDBServiceImpl implements IotDBService {
|
||||
|
||||
@Resource
|
||||
private IotDBSessionConfig iotDBSessionConfig;
|
||||
|
||||
/**
|
||||
* 单设备批量插入数据
|
||||
*
|
||||
* @param tablet
|
||||
*/
|
||||
@Override
|
||||
public void insertTablet(Tablet tablet) {
|
||||
SessionPool sessionPool = iotDBSessionConfig.getSessionPool();
|
||||
try {
|
||||
log.info("iotdb数据入库:tablet:[{}]", tablet);
|
||||
sessionPool.insertTablet(tablet);
|
||||
} catch (Exception e) {
|
||||
log.error("IotDBSession insertTablet失败: tablet={}, error={}", tablet, e.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 多设备批量插入数据
|
||||
*
|
||||
* @param tablets
|
||||
*/
|
||||
@Override
|
||||
public void insertTablets(Map<String, Tablet> tablets) {
|
||||
SessionPool sessionPool = iotDBSessionConfig.getSessionPool();
|
||||
try {
|
||||
log.info("iotdb数据入库:tablets:[{}]", tablets);
|
||||
sessionPool.insertTablets(tablets);
|
||||
} catch (Exception e) {
|
||||
log.error("IotDBSession insertTablets失败: tablets={}, error={}", tablets, e.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 单条数据插入(string类型数据项)
|
||||
*
|
||||
* @param deviceId 设备名(表名)root.ln.wf01.wt01
|
||||
* @param time 时间戳
|
||||
* @param measurements 数据项列表
|
||||
* @param values 数据项对应值列表
|
||||
*/
|
||||
@Override
|
||||
public void insertStringRecord(String deviceId, long time, List<String> measurements, List<String> values) {
|
||||
SessionPool sessionPool = iotDBSessionConfig.getSessionPool();
|
||||
try {
|
||||
log.info("iotdb数据入库:device_id:[{}], measurements:[{}], values:[{}]", deviceId, measurements, values);
|
||||
sessionPool.insertRecord(deviceId, time, measurements, values);
|
||||
} catch (Exception e) {
|
||||
log.error("IotDBSession insertRecord失败: deviceId={}, time={}, measurements={}, values={}, error={}",
|
||||
deviceId, time, measurements, values, e.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 单条数据插入(不同类型数据项)
|
||||
*
|
||||
* @param deviceId 设备名(表名)root.ln.wf01.wt01
|
||||
* @param time 时间戳
|
||||
* @param measurements 数据项列表
|
||||
* @param types 数据项对应类型列表
|
||||
* @param values 数据项对应值列表
|
||||
*/
|
||||
@Override
|
||||
public void insertRecord(String deviceId, long time, List<String> measurements, List<TSDataType> types, List<Object> values) {
|
||||
SessionPool sessionPool = iotDBSessionConfig.getSessionPool();
|
||||
try {
|
||||
log.info("iotdb数据入库:device_id:[{}], measurements:[{}], types:[{}], values:[{}]", deviceId, measurements, types, values);
|
||||
sessionPool.insertRecord(deviceId, time, measurements, types, values);
|
||||
} catch (Exception e) {
|
||||
log.error("IotDBSession insertRecordHasTypes失败: deviceId={}, time={}, measurements={},types={}, values={}, error={}",
|
||||
deviceId, time, measurements, types, values, e.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* 多个设备多条数据插入(string类型数据项)
|
||||
*
|
||||
* @param deviceIds 多个设备名(表名)root.ln.wf01.wt01
|
||||
* @param times 时间戳的列表
|
||||
* @param measurementsList 数据项列表的列表
|
||||
* @param valuesList 数据项对应值列表的列表
|
||||
*/
|
||||
@Override
|
||||
public void insertStringRecords(List<String> deviceIds, List<Long> times, List<List<String>> measurementsList, List<List<String>> valuesList) {
|
||||
SessionPool sessionPool = iotDBSessionConfig.getSessionPool();
|
||||
try {
|
||||
log.info("iotdb数据入库:deviceIds:[{}], measurementsList:[{}], valuesList:[{}]", deviceIds, measurementsList, valuesList);
|
||||
sessionPool.insertRecords(deviceIds, times, measurementsList, valuesList);
|
||||
} catch (Exception e) {
|
||||
log.error("IotDBSession insertRecords失败: deviceIds={}, times={}, measurementsList={}, valuesList={}, error={}",
|
||||
deviceIds, times, measurementsList, valuesList, e.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 多个设备多条数据插入(不同类型数据项)
|
||||
*
|
||||
* @param deviceIds 多个设备名(表名))root.ln.wf01.wt01
|
||||
* @param times 时间戳的列表
|
||||
* @param measurementsList 数据项列表的列表
|
||||
* @param typesList 数据项对应类型列表的列表
|
||||
* @param valuesList 数据项对应值列表的列表
|
||||
*/
|
||||
@Override
|
||||
public void insertRecords(List<String> deviceIds, List<Long> times, List<List<String>> measurementsList, List<List<TSDataType>> typesList, List<List<Object>> valuesList) {
|
||||
SessionPool sessionPool = iotDBSessionConfig.getSessionPool();
|
||||
try {
|
||||
log.info("iotdb数据入库:deviceIds:[{}], measurementsList:[{}], typesList:[{}], valuesList:[{}]", deviceIds, measurementsList, typesList, valuesList);
|
||||
sessionPool.insertRecords(deviceIds, times, measurementsList, typesList, valuesList);
|
||||
} catch (Exception e) {
|
||||
log.error("IotDBSession insertRecords失败: deviceIds={}, times={}, measurementsList={}, typesList=[],valuesList={}, error={}",
|
||||
deviceIds, times, measurementsList, typesList, valuesList, e.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 单个设备多条数据插入(string类型数据项)
|
||||
*
|
||||
* @param deviceId 单个设备名(表名))root.ln.wf01.wt01
|
||||
* @param times 时间戳的列表
|
||||
* @param measurementsList 数据项列表的列表
|
||||
* @param valuesList 数据项对应值列表的列表
|
||||
*/
|
||||
@Override
|
||||
public void insertStringRecordsOfOneDevice(String deviceId, List<Long> times, List<List<String>> measurementsList, List<List<String>> valuesList) {
|
||||
SessionPool sessionPool = iotDBSessionConfig.getSessionPool();
|
||||
try {
|
||||
log.info("iotdb数据入库:deviceId:[{}], measurementsList:[{}], valuesList:[{}]", deviceId, measurementsList, valuesList);
|
||||
sessionPool.insertStringRecordsOfOneDevice(deviceId, times, measurementsList, valuesList);
|
||||
} catch (Exception e) {
|
||||
log.error("IotDBSession insertStringRecordsOfOneDevice失败: deviceId={}, times={}, measurementsList={}, valuesList={}, error={}",
|
||||
deviceId, times, measurementsList, valuesList, e.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 单个设备多条数据插入(不同类型数据项)
|
||||
*
|
||||
* @param deviceId 单个设备名(表名))root.ln.wf01.wt01
|
||||
* @param times 时间戳的列表
|
||||
* @param measurementsList 数据项列表的列表
|
||||
* @param typesList 数据项对应类型列表的列表
|
||||
* @param valuesList 数据项对应值列表的列表
|
||||
*/
|
||||
@Override
|
||||
public void insertRecordsOfOneDevice(String deviceId, List<Long> times, List<List<String>> measurementsList, List<List<TSDataType>> typesList, List<List<Object>> valuesList) {
|
||||
SessionPool sessionPool = iotDBSessionConfig.getSessionPool();
|
||||
try {
|
||||
log.info("iotdb数据入库:deviceId:[{}], measurementsList:[{}], typesList:[{}], valuesList:[{}]", deviceId, measurementsList, typesList, valuesList);
|
||||
sessionPool.insertRecordsOfOneDevice(deviceId, times, measurementsList, typesList, valuesList);
|
||||
} catch (Exception e) {
|
||||
log.error("IotDBSession insertRecordsOfOneDevice失败: deviceId={}, times={}, measurementsList={}, typesList=[],valuesList={}, error={}", deviceId, times, measurementsList, typesList, valuesList, e.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 删除数据(删除一个时间序列在某个时间点前或这个时间点的数据)
|
||||
*
|
||||
* @param path 单个字段 root.ln.wf01.wt01.temperature
|
||||
* @param endTime 删除时间点
|
||||
*/
|
||||
@Override
|
||||
public void deleteData(String path, long endTime) {
|
||||
SessionPool sessionPool = iotDBSessionConfig.getSessionPool();
|
||||
try {
|
||||
log.info("iotdb数据删除:path:[{}], endTime:[{}]", path, endTime);
|
||||
sessionPool.deleteData(path, endTime);
|
||||
} catch (Exception e) {
|
||||
log.error("IotDBSession deleteData失败: deviceId={}, times={},error={}", path, endTime, e.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 删除数据(删除多个时间序列在某个时间点前或这个时间点的数据)
|
||||
*
|
||||
* @param paths 多个字段(表名)) root.ln.wf01.wt01.temperature
|
||||
* @param endTime 删除时间点
|
||||
*/
|
||||
@Override
|
||||
public void deleteData(List<String> paths, long endTime) {
|
||||
SessionPool sessionPool = iotDBSessionConfig.getSessionPool();
|
||||
try {
|
||||
log.info("iotdb数据删除:paths:[{}], endTime:[{}]", paths, endTime);
|
||||
sessionPool.deleteData(paths, endTime);
|
||||
} catch (Exception e) {
|
||||
log.error("IotDBSession deleteData失败: paths={}, times={},error={}", paths, endTime, e.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 数据查询(时间序列原始数据范围查询,时间范围为左闭右开区间,包含开始时间但不包含结束时间)
|
||||
*
|
||||
* @param paths 多个字段(表名)) root.ln.wf01.wt01.temperature
|
||||
* @param startTime 开始时间
|
||||
* @param endTime 结束时间
|
||||
* @param outTime 超时时间
|
||||
* @return SessionDataSet (Time,paths)
|
||||
*/
|
||||
@Override
|
||||
public SessionDataSet executeRawDataQuery(List<String> paths, long startTime, long endTime, long outTime) {
|
||||
SessionPool sessionPool = iotDBSessionConfig.getSessionPool();
|
||||
SessionDataSetWrapper sessionDataSetWrapper = null;
|
||||
try {
|
||||
log.info("iotdb数据查询:paths:[{}], startTime:[{}], endTime:[{}],outTime:[{}]", paths, startTime, endTime, outTime);
|
||||
sessionDataSetWrapper = sessionPool.executeRawDataQuery(paths, startTime, endTime, outTime);
|
||||
return sessionDataSetWrapper.getSessionDataSet();
|
||||
} catch (Exception e) {
|
||||
log.error("IotDBSession executeRawDataQuery失败: paths={}, startTime:[{}], endTime:[{}],outTime:[{}],error={}", paths, startTime, endTime, outTime, e.getMessage());
|
||||
} finally {
|
||||
sessionPool.closeResultSet(sessionDataSetWrapper);
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
/**
|
||||
* 数据查询(时间序列原始数据范围查询,时间范围为左闭右开区间,包含开始时间但不包含结束时间)
|
||||
*
|
||||
* @param paths 多个字段(表名)) root.ln.wf01.wt01.temperature
|
||||
* @param startTime 开始时间
|
||||
* @param endTime 结束时间
|
||||
* @param outTime 超时时间
|
||||
* @param clazz 返回数据对应的对象(对象属性必须与字段名对应)
|
||||
* @param <T>
|
||||
* @return
|
||||
*/
|
||||
@Override
|
||||
public <T> List<T> executeRawDataQuery(List<String> paths, long startTime, long endTime, long outTime, Class<? extends IotDbRecordAble> clazz) {
|
||||
SessionDataSet sessionDataSet = executeRawDataQuery(paths, startTime, endTime, outTime);
|
||||
List<String> columnNames = sessionDataSet.getColumnNames();
|
||||
List<T> resultEntities = null;
|
||||
try {
|
||||
resultEntities = packagingObjectData(sessionDataSet, columnNames, clazz);
|
||||
} catch (Exception e) {
|
||||
log.error("IotDBSession executeRawDataQuery失败: paths={}, startTime:[{}], endTime:[{}],outTime:[{}],error={}", paths, startTime, endTime, outTime, e.getMessage());
|
||||
}
|
||||
return resultEntities;
|
||||
}
|
||||
|
||||
/**
|
||||
* 最新点查询(查询最后一条时间戳大于等于某个时间点的数据)
|
||||
*
|
||||
* @param paths 多个字段(表名)) root.ln.wf01.wt01.temperature
|
||||
* @param lastTime 结束时间
|
||||
* @return SessionDataSet
|
||||
*/
|
||||
@Override
|
||||
public SessionDataSet executeLastDataQuery(List<String> paths, long lastTime) {
|
||||
SessionPool sessionPool = iotDBSessionConfig.getSessionPool();
|
||||
SessionDataSetWrapper sessionDataSetWrapper = null;
|
||||
try {
|
||||
log.info("iotdb数据查询:paths:[{}], lastTime:[{}]", paths, lastTime);
|
||||
sessionDataSetWrapper = sessionPool.executeLastDataQuery(paths, lastTime);
|
||||
return sessionDataSetWrapper.getSessionDataSet();
|
||||
} catch (Exception e) {
|
||||
log.error("IotDBSession executeLastDataQuery失败: paths={}, lastTime:[{}], error={}", paths, lastTime, e.getMessage());
|
||||
} finally {
|
||||
sessionPool.closeResultSet(sessionDataSetWrapper);
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
/**
|
||||
* 最新点查询(查询最后一条时间戳大于等于某个时间点的数据)
|
||||
*
|
||||
* @param <T>
|
||||
* @param paths 多个字段(表名)) root.ln.wf01.wt01.temperature
|
||||
* @param lastTime 结束时间
|
||||
* @param clazz 返回数据对应的对象(对象属性必须与字段名对应)
|
||||
* @return
|
||||
*/
|
||||
@Override
|
||||
public <T> List<T> executeLastDataQuery(List<String> paths, long lastTime, Class<? extends IotDbRecordAble> clazz) {
|
||||
SessionDataSet sessionDataSet = executeLastDataQuery(paths, lastTime);
|
||||
List<String> columnNames = sessionDataSet.getColumnNames();
|
||||
List<T> resultEntities = null;
|
||||
try {
|
||||
resultEntities = packagingObjectData(sessionDataSet, columnNames, clazz);
|
||||
} catch (Exception e) {
|
||||
log.error("IotDBSession executeLastDataQuery失败: paths={}, lastTime:[{}], error={}", paths, lastTime, e.getMessage());
|
||||
}
|
||||
return resultEntities;
|
||||
}
|
||||
|
||||
/**
|
||||
* 最新点查询(快速查询单设备下指定序列最新点)
|
||||
*
|
||||
* @param db root.ln.wf01
|
||||
* @param device root.ln.wf01.wt01
|
||||
* @param sensors temperature,status(字段名)
|
||||
* @param isLegalPathNodes true(避免路径校验)
|
||||
* @return SessionDataSet
|
||||
*/
|
||||
@Override
|
||||
public SessionDataSet executeLastDataQueryForOneDevice(String db, String device, List<String> sensors, boolean isLegalPathNodes) {
|
||||
SessionPool sessionPool = iotDBSessionConfig.getSessionPool();
|
||||
SessionDataSetWrapper sessionDataSetWrapper = null;
|
||||
try {
|
||||
log.info("iotdb数据查询:db:[{}], device:[{}],sensors:[{}], isLegalPathNodes:[{}]", db, device, sensors, isLegalPathNodes);
|
||||
sessionDataSetWrapper = sessionPool.executeLastDataQueryForOneDevice(db, device, sensors, isLegalPathNodes);
|
||||
return sessionDataSetWrapper.getSessionDataSet();
|
||||
} catch (Exception e) {
|
||||
log.error("IotDBSession executeLastDataQueryForOneDevice失败: db:[{}], device:[{}],sensors:[{}], isLegalPathNodes:[{}], error={}", db, device, sensors, isLegalPathNodes, e.getMessage());
|
||||
} finally {
|
||||
sessionPool.closeResultSet(sessionDataSetWrapper);
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
/**
|
||||
* @param db root.ln.wf01
|
||||
* @param device root.ln.wf01.wt01
|
||||
* @param sensors temperature,status(字段名)
|
||||
* @param isLegalPathNodes true(避免路径校验)
|
||||
* @param clazz 返回数据对应的对象(对象属性必须与字段名对应)
|
||||
* @param <T>
|
||||
* @return
|
||||
*/
|
||||
@Override
|
||||
public <T> List<T> executeLastDataQueryForOneDevice(String db, String device, List<String> sensors, boolean isLegalPathNodes, Class<? extends IotDbRecordAble> clazz) {
|
||||
SessionDataSet sessionDataSet = executeLastDataQueryForOneDevice(db, device, sensors, isLegalPathNodes);
|
||||
List<String> columnNames = sessionDataSet.getColumnNames();
|
||||
List<T> resultEntities = null;
|
||||
try {
|
||||
resultEntities = packagingObjectData(sessionDataSet, columnNames, clazz);
|
||||
} catch (Exception e) {
|
||||
log.error("IotDBSession executeLastDataQueryForOneDevice失败: db:[{}], device:[{}],sensors:[{}], isLegalPathNodes:[{}], error={}", db, device, sensors, isLegalPathNodes, e.getMessage());
|
||||
}
|
||||
return resultEntities;
|
||||
}
|
||||
|
||||
/**
|
||||
* 聚合查询
|
||||
*
|
||||
* @param paths 多个字段(表名)) root.ln.wf01.wt01.temperature
|
||||
* @param aggregations 聚合操作 TAggregationType.SUM,TAggregationType.COUNT
|
||||
* @return SessionDataSet
|
||||
*/
|
||||
@Override
|
||||
public SessionDataSet executeAggregationQuery(List<String> paths, List<TAggregationType> aggregations) {
|
||||
SessionPool sessionPool = iotDBSessionConfig.getSessionPool();
|
||||
SessionDataSetWrapper sessionDataSetWrapper = null;
|
||||
try {
|
||||
log.info("iotdb聚合查询:paths:[{}], aggregations:[{}]", paths, aggregations);
|
||||
sessionDataSetWrapper = sessionPool.executeAggregationQuery(paths, aggregations);
|
||||
return sessionDataSetWrapper.getSessionDataSet();
|
||||
} catch (Exception e) {
|
||||
log.error("IotDBSession executeAggregationQuery失败: paths:[{}], aggregations:[{}] ,error={}", paths, aggregations, e.getMessage());
|
||||
} finally {
|
||||
sessionPool.closeResultSet(sessionDataSetWrapper);
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
/**
|
||||
* 聚合查询(时间序列原始数据范围查询,时间范围为左闭右开区间,包含开始时间但不包含结束时间)
|
||||
*
|
||||
* @param paths 多个字段(表名)) root.ln.wf01.wt01.temperature
|
||||
* @param aggregations 聚合操作 TAggregationType.SUM,TAggregationType.COUNT
|
||||
* @param startTime 开始时间(包含)
|
||||
* @param endTime 结束时间
|
||||
* @return SessionDataSet
|
||||
*/
|
||||
@Override
|
||||
public SessionDataSet executeAggregationQuery(List<String> paths, List<TAggregationType> aggregations, long startTime, long endTime) {
|
||||
SessionPool sessionPool = iotDBSessionConfig.getSessionPool();
|
||||
SessionDataSetWrapper sessionDataSetWrapper = null;
|
||||
try {
|
||||
log.info("iotdb聚合查询:paths:[{}], aggregations:[{}],startTime:[{}], endTime:[{}]", paths, aggregations, startTime, endTime);
|
||||
sessionDataSetWrapper = sessionPool.executeAggregationQuery(paths, aggregations, startTime, endTime);
|
||||
return sessionDataSetWrapper.getSessionDataSet();
|
||||
} catch (Exception e) {
|
||||
log.error("IotDBSession executeAggregationQuery失败: paths:[{}], aggregations:[{}] ,startTime:[{}], endTime:[{}],error={}", paths, aggregations, startTime, endTime, e.getMessage());
|
||||
} finally {
|
||||
sessionPool.closeResultSet(sessionDataSetWrapper);
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
/**
|
||||
* 聚合查询(支持按照时间区间分段查询)
|
||||
*
|
||||
* @param paths 多个字段(表名)) root.ln.wf01.wt01.temperature
|
||||
* @param aggregations 聚合操作 TAggregationType.SUM,TAggregationType.COUNT
|
||||
* @param startTime 开始时间(包含)
|
||||
* @param endTime 结束时间
|
||||
* @param interval
|
||||
* @return SessionDataSet
|
||||
*/
|
||||
@Override
|
||||
public SessionDataSet executeAggregationQuery(List<String> paths, List<TAggregationType> aggregations, long startTime, long endTime, long interval) {
|
||||
SessionPool sessionPool = iotDBSessionConfig.getSessionPool();
|
||||
SessionDataSetWrapper sessionDataSetWrapper = null;
|
||||
try {
|
||||
log.info("iotdb聚合查询:paths:[{}], aggregations:[{}],startTime:[{}], endTime:[{}] ,interval:[{}]", paths, aggregations, startTime, endTime, interval);
|
||||
sessionDataSetWrapper = sessionPool.executeAggregationQuery(paths, aggregations, startTime, endTime, interval);
|
||||
return sessionDataSetWrapper.getSessionDataSet();
|
||||
} catch (Exception e) {
|
||||
log.error("IotDBSession executeAggregationQuery失败: paths:[{}], aggregations:[{}] ,startTime:[{}], endTime:[{}], interval:[{}], error={}", paths, aggregations, startTime, endTime, interval, e.getMessage());
|
||||
} finally {
|
||||
sessionPool.closeResultSet(sessionDataSetWrapper);
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
/**
|
||||
* 聚合查询(支持按照时间区间分段查询)
|
||||
*
|
||||
* @param paths 多个字段(表名)) root.ln.wf01.wt01.temperature
|
||||
* @param aggregations 聚合操作 TAggregationType.SUM,TAggregationType.COUNT
|
||||
* @param startTime 开始时间(包含)
|
||||
* @param endTime 结束时间
|
||||
* @param interval
|
||||
* @param slidingStep
|
||||
* @return SessionDataSet
|
||||
*/
|
||||
@Override
|
||||
public SessionDataSet executeAggregationQuery(List<String> paths, List<TAggregationType> aggregations, long startTime, long endTime, long interval, long slidingStep) {
|
||||
SessionPool sessionPool = iotDBSessionConfig.getSessionPool();
|
||||
SessionDataSetWrapper sessionDataSetWrapper = null;
|
||||
|
||||
try {
|
||||
log.info("iotdb聚合查询:paths:[{}], aggregations:[{}],startTime:[{}], endTime:[{}] ,interval:[{}], slidingStep:[{}]", paths, aggregations, startTime, endTime, interval, slidingStep);
|
||||
sessionDataSetWrapper = sessionPool.executeAggregationQuery(paths, aggregations, startTime, endTime, interval, slidingStep);
|
||||
return sessionDataSetWrapper.getSessionDataSet();
|
||||
} catch (Exception e) {
|
||||
log.error("IotDBSession executeAggregationQuery失败: paths:[{}], aggregations:[{}] ,startTime:[{}], endTime:[{}], interval:[{}], slidingStep:[{}] ,error={}", paths, aggregations, startTime, endTime, interval, slidingStep, e.getMessage());
|
||||
} finally {
|
||||
sessionPool.closeResultSet(sessionDataSetWrapper);
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
/**
|
||||
* SQL查询
|
||||
*
|
||||
* @param sql
|
||||
* @return
|
||||
*/
|
||||
@Override
|
||||
public SessionDataSet executeQueryStatement(String sql) {
|
||||
SessionPool sessionPool = iotDBSessionConfig.getSessionPool();
|
||||
SessionDataSetWrapper sessionDataSetWrapper = null;
|
||||
|
||||
try {
|
||||
log.info("iotdb SQL查询:sql:[{}]", sql);
|
||||
sessionDataSetWrapper = sessionPool.executeQueryStatement(sql);
|
||||
return sessionDataSetWrapper.getSessionDataSet();
|
||||
} catch (Exception e) {
|
||||
log.error("IotDBSession executeQueryStatement失败:sql:[{}],error={}", sql, e.getMessage());
|
||||
} finally {
|
||||
sessionPool.closeResultSet(sessionDataSetWrapper);
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
/**
|
||||
* SQL非查询
|
||||
*
|
||||
* @param sql
|
||||
*/
|
||||
@Override
|
||||
public void executeNonQueryStatement(String sql) {
|
||||
SessionPool sessionPool = iotDBSessionConfig.getSessionPool();
|
||||
try {
|
||||
log.info("iotdb SQL无查询:sql:[{}]", sql);
|
||||
sessionPool.executeNonQueryStatement(sql);
|
||||
} catch (Exception e) {
|
||||
log.error("IotDBSession executeNonQueryStatement失败:sql:[{}],error={}", sql, e.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 封装处理数据
|
||||
*
|
||||
* @param sessionDataSet
|
||||
* @param titleList
|
||||
*/
|
||||
@SneakyThrows
|
||||
@Override
|
||||
public List<Map<String, Object>> packagingMapData(SessionDataSet sessionDataSet, List<String> titleList) {
|
||||
int fetchSize = sessionDataSet.getFetchSize();
|
||||
List<Map<String, Object>> resultList = new ArrayList<>();
|
||||
titleList.remove("Time");
|
||||
if (fetchSize > 0) {
|
||||
while (sessionDataSet.hasNext()) {
|
||||
Map<String, Object> resultMap = new HashMap<>();
|
||||
RowRecord next = sessionDataSet.next();
|
||||
List<Field> fields = next.getFields();
|
||||
String timeString = new java.text.SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(next.getTimestamp());
|
||||
resultMap.put("time", timeString);
|
||||
for (int i = 0; i < fields.size(); i++) {
|
||||
Field field = fields.get(i);
|
||||
if (field.getDataType() == null || field.getObjectValue(field.getDataType()) == null) {
|
||||
resultMap.put(splitString(titleList.get(i)), null);
|
||||
} else {
|
||||
resultMap.put(splitString(titleList.get(i)), field.getObjectValue(field.getDataType()).toString());
|
||||
}
|
||||
}
|
||||
resultList.add(resultMap);
|
||||
}
|
||||
}
|
||||
return resultList;
|
||||
}
|
||||
|
||||
/**
|
||||
* 封装处理数据(不支持聚合查询)
|
||||
*
|
||||
* @param sessionDataSet 查询返回的结果集
|
||||
* @param titleList 查询返回的结果集内的字段名
|
||||
* @param clazz 返回数据对应的对象(对象属性必须与字段名对应)
|
||||
* @param <T>
|
||||
* @return
|
||||
*/
|
||||
@SneakyThrows
|
||||
@Override
|
||||
public <T> List<T> packagingObjectData(SessionDataSet sessionDataSet, List<String> titleList, Class<? extends IotDbRecordAble> clazz) {
|
||||
int fetchSize = sessionDataSet.getFetchSize();
|
||||
List<T> resultList = new ArrayList<>();
|
||||
titleList.remove("Time");
|
||||
if (fetchSize > 0) {
|
||||
while (sessionDataSet.hasNext()) {
|
||||
Map<String, Object> resultMap = new HashMap<>();
|
||||
RowRecord next = sessionDataSet.next();
|
||||
List<Field> fields = next.getFields();
|
||||
String timeString = new java.text.SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(next.getTimestamp());
|
||||
resultMap.put("time", timeString);
|
||||
if (titleList.stream().anyMatch(str -> str.contains("."))) {
|
||||
for (int i = 0; i < fields.size(); i++) {
|
||||
Field field = fields.get(i);
|
||||
String title = titleList.get(i);
|
||||
if (field.getDataType() == null || field.getObjectValue(field.getDataType()) == null) {
|
||||
resultMap.put(splitString(title), null);
|
||||
} else {
|
||||
resultMap.put(splitString(title), field.getObjectValue(field.getDataType()).toString());
|
||||
}
|
||||
}
|
||||
} else {
|
||||
Field fieldName = fields.get(0);
|
||||
Field fieldValue = fields.get(1);
|
||||
Field fieldDataType = fields.get(2);
|
||||
if (fieldName.getDataType() != null && fieldName.getObjectValue(fieldName.getDataType()) != null) {
|
||||
String mapKey = fieldName.getObjectValue(fieldName.getDataType()).toString();
|
||||
Object mapValue = convertStringToType(fieldValue.getObjectValue(fieldValue.getDataType()).toString(), fieldDataType.getObjectValue(fieldDataType.getDataType()).toString());
|
||||
resultMap.put(splitString(mapKey), mapValue);
|
||||
}
|
||||
}
|
||||
|
||||
String jsonString = JSON.toJSONString(resultMap);
|
||||
resultList.add(JSON.parseObject(jsonString, (Type) clazz));
|
||||
}
|
||||
}
|
||||
return resultList;
|
||||
}
|
||||
|
||||
/**
|
||||
* 分割获取字段名
|
||||
*
|
||||
* @param str
|
||||
* @return 字段名
|
||||
*/
|
||||
public static String splitString(String str) {
|
||||
String[] parts = str.split("\\.");
|
||||
if (parts.length <= 0) {
|
||||
return str;
|
||||
} else {
|
||||
return parts[parts.length - 1];
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 根据数据值和数据类型返回对应数据类型数据
|
||||
*
|
||||
* @param value 数据值
|
||||
* @param typeName 数据类型
|
||||
* @return 转换后的数据值
|
||||
*/
|
||||
public static Object convertStringToType(String value, String typeName) {
|
||||
String type = typeName.toLowerCase();
|
||||
if (type.isEmpty()) {
|
||||
return value;
|
||||
}
|
||||
if ("boolean".equals(type)) {
|
||||
return Boolean.parseBoolean(value);
|
||||
} else if ("double".equals(type)) {
|
||||
return Double.parseDouble(value);
|
||||
} else if ("int32".equals(type)) {
|
||||
return Integer.parseInt(value);
|
||||
} else if ("int64".equals(type)) {
|
||||
return Long.parseLong(value);
|
||||
} else if ("float".equals(type)) {
|
||||
return Float.parseFloat(value);
|
||||
} else if ("text".equals(type)) {
|
||||
return value;
|
||||
} else {
|
||||
return value;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 根据对象属性的数据类型返回对应的TSDataType
|
||||
*
|
||||
* @param type 属性的数据类型
|
||||
* @return TSDataType
|
||||
*/
|
||||
public static TSDataType getTsDataTypeByString(String type) {
|
||||
String typeName = splitString(type).toLowerCase();
|
||||
if ("boolean".equals(typeName)) {
|
||||
return TSDataType.BOOLEAN;
|
||||
} else if ("double".equals(typeName)) {
|
||||
return TSDataType.DOUBLE;
|
||||
} else if ("int".equals(typeName) || "integer".equals(typeName)) {
|
||||
return TSDataType.INT32;
|
||||
} else if ("long".equals(typeName)) {
|
||||
return TSDataType.INT64;
|
||||
} else if ("float".equals(typeName)) {
|
||||
return TSDataType.FLOAT;
|
||||
} else if ("text".equals(typeName)) {
|
||||
return TSDataType.TEXT;
|
||||
} else if ("string".equals(typeName)) {
|
||||
return TSDataType.TEXT;
|
||||
} else {
|
||||
return TSDataType.UNKNOWN;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 根据对象构建MeasurementSchemas
|
||||
*
|
||||
* @param obj 对象
|
||||
* @return
|
||||
*/
|
||||
@Override
|
||||
public List<MeasurementSchema> buildMeasurementSchemas(Object obj) {
|
||||
java.lang.reflect.Field[] fields = obj.getClass().getDeclaredFields();
|
||||
List<MeasurementSchema> schemaList = Arrays.stream(fields).map(field ->
|
||||
new MeasurementSchema(field.getName(),
|
||||
getTsDataTypeByString(
|
||||
field.getType().getName()
|
||||
))).
|
||||
collect(Collectors.toList());
|
||||
return schemaList;
|
||||
}
|
||||
|
||||
/**
|
||||
* 根据对象构建MeasurementSchemaValuesDTO
|
||||
*
|
||||
* @param obj 对象
|
||||
* @return
|
||||
*/
|
||||
@SneakyThrows
|
||||
@Override
|
||||
public MeasurementSchemaValuesDTO buildMeasurementSchemasAndValues(Object obj) {
|
||||
MeasurementSchemaValuesDTO measurementSchemaValuesDTO = new MeasurementSchemaValuesDTO();
|
||||
java.lang.reflect.Field[] fields = obj.getClass().getDeclaredFields();
|
||||
List<MeasurementSchema> schemaList = new ArrayList<>();
|
||||
List<Object> values = new ArrayList<>();
|
||||
List<Integer> valuesIsNullIndex = new ArrayList<>();
|
||||
int valueIndex = 0;
|
||||
for (java.lang.reflect.Field field : fields) {
|
||||
MeasurementSchema measurementSchema = new MeasurementSchema(field.getName(), getTsDataTypeByString(field.getType().getName()));
|
||||
schemaList.add(measurementSchema);
|
||||
Object value = field.get(obj);
|
||||
if (value == null) {
|
||||
valuesIsNullIndex.add(valueIndex);
|
||||
}
|
||||
values.add(value);
|
||||
valueIndex++;
|
||||
}
|
||||
measurementSchemaValuesDTO.setSchemaList(schemaList);
|
||||
measurementSchemaValuesDTO.setValues(values);
|
||||
return measurementSchemaValuesDTO;
|
||||
}
|
||||
}
|
|
@ -0,0 +1 @@
|
|||
com.muyu.data.iotdb.config.IotDBSessionConfig
|
|
@ -17,6 +17,10 @@
|
|||
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
|
||||
</properties>
|
||||
|
||||
<description>
|
||||
cloud-common-kafka kafka公共模块
|
||||
</description>
|
||||
|
||||
<dependencies>
|
||||
<!-- 项目公共核心模块 -->
|
||||
<dependency>
|
||||
|
|
|
@ -17,6 +17,10 @@
|
|||
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
|
||||
</properties>
|
||||
|
||||
<description>
|
||||
cloud-common-rabbit 消息队列服务
|
||||
</description>
|
||||
|
||||
<dependencies>
|
||||
|
||||
<!-- rabbitMq 消息队列 -->
|
||||
|
@ -28,8 +32,8 @@
|
|||
<!-- 项目公共核心 -->
|
||||
<dependency>
|
||||
<groupId>com.muyu</groupId>
|
||||
<artifactId>cloud-common-core</artifactId>
|
||||
<artifactId>cloud-common-redis</artifactId>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
|
||||
</project>
|
||||
</project>
|
||||
|
|
|
@ -1,41 +0,0 @@
|
|||
package com.muyu.common.rabbit;
|
||||
|
||||
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
|
||||
import org.springframework.amqp.rabbit.listener.RabbitListenerEndpointRegistrar;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
import org.springframework.messaging.converter.MappingJackson2MessageConverter;
|
||||
import org.springframework.messaging.handler.annotation.support.DefaultMessageHandlerMethodFactory;
|
||||
|
||||
@Configuration
|
||||
public class RabbitListenerConfigurer implements org.springframework.amqp.rabbit.annotation.RabbitListenerConfigurer {
|
||||
|
||||
static {
|
||||
System.setProperty("spring.amqp.deserialization.trust.all", "true");
|
||||
}
|
||||
|
||||
//以下配置RabbitMQ消息服务
|
||||
@Autowired
|
||||
public ConnectionFactory connectionFactory;
|
||||
|
||||
|
||||
/**
|
||||
* 处理器方法工厂
|
||||
* @return
|
||||
*/
|
||||
@Bean
|
||||
public DefaultMessageHandlerMethodFactory handlerMethodFactory() {
|
||||
DefaultMessageHandlerMethodFactory factory = new DefaultMessageHandlerMethodFactory();
|
||||
// 这里的转换器设置实现了 通过 @Payload 注解 自动反序列化message body
|
||||
factory.setMessageConverter(new MappingJackson2MessageConverter());
|
||||
return factory;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void configureRabbitListeners(RabbitListenerEndpointRegistrar rabbitListenerEndpointRegistrar) {
|
||||
rabbitListenerEndpointRegistrar.setMessageHandlerMethodFactory(handlerMethodFactory());
|
||||
}
|
||||
|
||||
}
|
||||
|
|
@ -0,0 +1,83 @@
|
|||
package com.muyu.common.rabbit.config;
|
||||
|
||||
|
||||
import com.muyu.common.rabbit.constants.RabbitmqConstants;
|
||||
import org.springframework.amqp.core.*;
|
||||
import org.springframework.amqp.rabbit.core.RabbitAdmin;
|
||||
import org.springframework.beans.factory.annotation.Qualifier;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
|
||||
import javax.annotation.Resource;
|
||||
import java.util.HashMap;
|
||||
|
||||
/**
|
||||
* @ClassName: DelayedQueueConfig
|
||||
* @Description: 延迟队列配置类
|
||||
*/
|
||||
@Configuration
|
||||
public class DelayedQueueConfig {
|
||||
|
||||
|
||||
@Resource
|
||||
private RabbitAdmin rabbitAdmin;
|
||||
|
||||
/**
|
||||
* 声明队列
|
||||
* @return 返回队列
|
||||
*/
|
||||
@Bean
|
||||
public Queue delayedQueue() {
|
||||
Queue queue = new Queue(RabbitmqConstants.DELAYED_QUEUE_NAME);
|
||||
rabbitAdmin.declareQueue(queue);
|
||||
return queue;
|
||||
}
|
||||
|
||||
/**
|
||||
* 声明交换机
|
||||
* @return 返回交换机
|
||||
*/
|
||||
@Bean
|
||||
public Exchange delayedExchange() {
|
||||
HashMap<String, Object> arguments = new HashMap<>(3);
|
||||
|
||||
arguments.put("x-delayed-type", "direct");
|
||||
|
||||
/**
|
||||
* 声明自定义交换机
|
||||
* 第一个参数:交换机的名称
|
||||
* 第二个参数:交换机的类型
|
||||
* 第三个参数:是否需要持久化
|
||||
* 第四个参数:是否自动删除
|
||||
* 第五个参数:其他参数
|
||||
*/
|
||||
CustomExchange customExchange = new CustomExchange(
|
||||
RabbitmqConstants.DELAYED_EXCHANGE_NAME,
|
||||
"x-delayed-message",
|
||||
true,
|
||||
false,
|
||||
arguments);
|
||||
rabbitAdmin.declareExchange(customExchange);
|
||||
return customExchange;
|
||||
}
|
||||
|
||||
/**
|
||||
* 绑定交换机
|
||||
* @param delayedQueue 队列对象
|
||||
* @param delayedExchange 交换机对象
|
||||
*/
|
||||
@Bean
|
||||
public Binding delayedQueueBindingDelayedExchange(
|
||||
@Qualifier("delayedQueue") Queue delayedQueue,
|
||||
@Qualifier("delayedExchange") Exchange delayedExchange) {
|
||||
|
||||
Binding noargs = BindingBuilder.bind(delayedQueue)
|
||||
.to(delayedExchange)
|
||||
.with(RabbitmqConstants.DELAYED_ROUTING_KEY)
|
||||
.noargs();
|
||||
rabbitAdmin.declareBinding(noargs);
|
||||
return noargs;
|
||||
}
|
||||
|
||||
|
||||
}
|
|
@ -0,0 +1,47 @@
|
|||
package com.muyu.common.rabbit.config;
|
||||
|
||||
import lombok.AllArgsConstructor;
|
||||
import org.springframework.amqp.rabbit.connection.CorrelationData;
|
||||
import org.springframework.amqp.rabbit.core.RabbitTemplate;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import javax.annotation.PostConstruct;
|
||||
|
||||
/**
|
||||
* @ClassName:
|
||||
* @Description: 消息发送到 交换机的确认 回调方法
|
||||
*/
|
||||
@Component
|
||||
@AllArgsConstructor
|
||||
public class MyConfirmCallback implements RabbitTemplate.ConfirmCallback {
|
||||
|
||||
|
||||
private RabbitTemplate rabbitTemplate;
|
||||
|
||||
// public MyConfirmCallback(RabbitTemplate rabbitTemplate) {
|
||||
// this.rabbitTemplate = rabbitTemplate;
|
||||
// // 设置 消息发送到交换机成功 的回调
|
||||
// this.rabbitTemplate.setConfirmCallback(this);
|
||||
// }
|
||||
|
||||
@PostConstruct
|
||||
public void init() {
|
||||
this.rabbitTemplate.setConfirmCallback(this);
|
||||
}
|
||||
|
||||
/**
|
||||
* 发送消息到交换机的回调方法 消息发送成功或者失败都会执行
|
||||
*
|
||||
* @param correlationData correlation data for the callback. 消息的元数据
|
||||
* @param ack true for ack, false for nack
|
||||
* @param cause An optional cause, for nack, when available, otherwise null.
|
||||
*/
|
||||
@Override
|
||||
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
|
||||
if (ack) {
|
||||
System.out.println("消息发送到交换机成功~");
|
||||
} else {
|
||||
System.out.println("消息发送到交换机失败,失败的原因:" + cause);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,49 @@
|
|||
package com.muyu.common.rabbit.config;
|
||||
|
||||
|
||||
|
||||
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
|
||||
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
|
||||
import org.springframework.amqp.rabbit.core.RabbitAdmin;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
|
||||
/**
|
||||
* @ClassName: RabbitAdminConfig
|
||||
* @Description: RabbitAdmin配置类
|
||||
*/
|
||||
@Configuration
|
||||
public class RabbitAdminConfig {
|
||||
@Value("${spring.rabbitmq.host}")
|
||||
private String host;
|
||||
@Value("${spring.rabbitmq.username}")
|
||||
private String username;
|
||||
@Value("${spring.rabbitmq.password}")
|
||||
private String password;
|
||||
@Value("${spring.rabbitmq.virtualhost}")
|
||||
private String virtualHost;
|
||||
|
||||
@Bean
|
||||
public ConnectionFactory connectionFactory() {
|
||||
CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory();
|
||||
cachingConnectionFactory.setHost(host);
|
||||
cachingConnectionFactory.setUsername(username);
|
||||
cachingConnectionFactory.setPassword(password);
|
||||
cachingConnectionFactory.setVirtualHost(virtualHost);
|
||||
return cachingConnectionFactory;
|
||||
|
||||
}
|
||||
|
||||
@Bean
|
||||
public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {
|
||||
RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
|
||||
rabbitAdmin.setAutoStartup(true);
|
||||
return rabbitAdmin;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
|
@ -0,0 +1,20 @@
|
|||
package com.muyu.common.rabbit.config;
|
||||
|
||||
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
|
||||
import org.springframework.amqp.support.converter.MessageConverter;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
|
||||
/**
|
||||
* JSON 消息转换器 自动将发送的消息转换成 json 字符串 并且 消费者接收到消息的时候自动反序列化 成需要的对象
|
||||
*/
|
||||
@Configuration
|
||||
public class RabbitmqConfig {
|
||||
|
||||
|
||||
// 消息转换配置
|
||||
@Bean
|
||||
public MessageConverter jsonMessageConverter() {
|
||||
return new Jackson2JsonMessageConverter();
|
||||
}
|
||||
}
|
|
@ -0,0 +1,37 @@
|
|||
package com.muyu.common.rabbit.config;
|
||||
|
||||
import lombok.AllArgsConstructor;
|
||||
import org.springframework.amqp.core.ReturnedMessage;
|
||||
import org.springframework.amqp.rabbit.core.RabbitTemplate;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import javax.annotation.PostConstruct;
|
||||
|
||||
/**
|
||||
* 消息发送到 队列的确认
|
||||
*/
|
||||
@Component
|
||||
@AllArgsConstructor
|
||||
public class ReturnCallbackConfig implements RabbitTemplate.ReturnsCallback {
|
||||
|
||||
|
||||
private final RabbitTemplate rabbitTemplate;
|
||||
|
||||
@PostConstruct // @PostContruct是spring框架的注解,在⽅法上加该注解会在项⽬启动的时候执⾏该⽅法,也可以理解为在spring容器初始化的时候执
|
||||
public void init() {
|
||||
rabbitTemplate.setReturnsCallback(this);
|
||||
}
|
||||
|
||||
/**
|
||||
* 消息发送到 队列失败的时候执行
|
||||
*
|
||||
* @param returnedMessage the returned message and metadata.
|
||||
*/
|
||||
@Override
|
||||
public void returnedMessage(ReturnedMessage returnedMessage) {
|
||||
System.out.println("消息" + returnedMessage.getMessage().toString() +
|
||||
"被交换机" + returnedMessage.getExchange() + "回退!"
|
||||
+ "退回原因为:" + returnedMessage.getReplyText());
|
||||
// 回退了所有的信息,可做补偿机制 记录发送的日志
|
||||
}
|
||||
}
|
|
@ -0,0 +1,22 @@
|
|||
package com.muyu.common.rabbit.constants;
|
||||
|
||||
/**
|
||||
* @Author: WangXin
|
||||
* @date: 2024/7/10
|
||||
* @Description: rabbitmq常量
|
||||
* @Version 1.0.0
|
||||
*/
|
||||
public class RabbitmqConstants {
|
||||
|
||||
//普通队列
|
||||
public static final String BASIC_QUEUE_NAME = "BASIC_QUEUE_NAME";
|
||||
|
||||
public static final String lOG_QUEUE_NAME = "LOG_QUEUE_NAME";
|
||||
//延迟队列
|
||||
//队列名称
|
||||
public static final String DELAYED_QUEUE_NAME = "delayed_queue";
|
||||
//交换机名称
|
||||
public static final String DELAYED_EXCHANGE_NAME = "DELAYED_EXCHANGE";
|
||||
//交换机
|
||||
public static final String DELAYED_ROUTING_KEY = "delayed";
|
||||
}
|
|
@ -0,0 +1,81 @@
|
|||
package com.muyu.common.rabbit.consumer;
|
||||
|
||||
import com.alibaba.fastjson2.JSONObject;
|
||||
import com.muyu.common.redis.service.RedisService;
|
||||
import com.rabbitmq.client.Channel;
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.extern.log4j.Log4j2;
|
||||
import org.springframework.amqp.core.Message;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Date;
|
||||
|
||||
/**
|
||||
* @ClassName: RabbitMQConsumerUtil
|
||||
* @Description: rabbitmq消费者
|
||||
*/
|
||||
@Component
|
||||
@Log4j2
|
||||
@AllArgsConstructor
|
||||
public class RabbitMQConsumerUtil {
|
||||
|
||||
private final RedisService redisService;
|
||||
|
||||
|
||||
/**
|
||||
* 普通消费者
|
||||
* @param data 数据类型
|
||||
* @param message
|
||||
* @param channel
|
||||
*/
|
||||
public void rabbitMQBasicConsumer(Object data ,Message message , Channel channel) {
|
||||
log.info("当前时间:{} :RabbitMQConsumerUtil : {}", new Date(), message);
|
||||
try {
|
||||
// 获取到消息 开始消费
|
||||
log.info("消息消费者接收到消息,消息内容:{}", JSONObject.toJSONString(data));
|
||||
|
||||
|
||||
Long add = redisService.redisTemplate.opsForSet().add(data, message.getMessageProperties().getMessageId());
|
||||
|
||||
if (add != 1) {
|
||||
return;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* -----------------------------------以下为异步业务操作----------------------------
|
||||
*/
|
||||
|
||||
/**
|
||||
* ------------------------------------------------------------------------------
|
||||
*/
|
||||
// 消费消息成功之后需要确认
|
||||
// long deliveryTag 消息投递序号 自增的数字 在整个队列中唯一 拿到这个序号就相当于拿到这条消息
|
||||
// boolean multiple 是否批量确认 true 批量 确认小于等于当前投递序号的消息 false 单个确认
|
||||
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
|
||||
log.info("xxx消费者接收到消息,消息内容:{},消费成功...", message);
|
||||
|
||||
} catch (Exception e) {
|
||||
log.error("xxx消费者接收到消息,消息内容:{},消费消息异常,异常信息:{}", message, e);
|
||||
// 消息回退 拒绝消费消息
|
||||
// long deliveryTag 消息投递序号 自增的数字 在整个队列中唯一 拿到这个序号就相当于拿到这条消息
|
||||
// boolean requeue 是否回到原来的队列
|
||||
try {
|
||||
channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);
|
||||
// channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
|
||||
} catch (IOException ex) {
|
||||
log.error("xxx消费者接收到消息,消息内容:{},回退消息异常,异常信息:{}", message, ex);
|
||||
}
|
||||
}finally {
|
||||
try {
|
||||
channel.close();
|
||||
} catch (Exception e) {
|
||||
log.error("xxx消费者关闭Channel异常,消息内容:{},异常信息:{}", message, e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
||||
}
|
|
@ -0,0 +1,174 @@
|
|||
package com.muyu.common.rabbit.producer;
|
||||
|
||||
import com.muyu.common.core.domain.Result;
|
||||
import com.muyu.common.rabbit.constants.RabbitmqConstants;
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.extern.log4j.Log4j2;
|
||||
import org.springframework.amqp.core.MessageProperties;
|
||||
import org.springframework.amqp.rabbit.core.RabbitTemplate;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import java.util.UUID;
|
||||
|
||||
/**
|
||||
* @ClassName: RabbitMQProducer
|
||||
* @Description: rabbitmq生产者
|
||||
*/
|
||||
@Component
|
||||
@AllArgsConstructor
|
||||
@Log4j2
|
||||
public class RabbitMQProducerUtil {
|
||||
//redis工具类对象
|
||||
|
||||
//rabbit
|
||||
private final RabbitTemplate rabbitTemplate;
|
||||
|
||||
|
||||
/**
|
||||
* 简单模型
|
||||
*
|
||||
* @param param 传递的消息 (如果是对象需要序列化)
|
||||
* @return 结果集
|
||||
* 一对一消费,只有一个消费者能接收到
|
||||
*/
|
||||
public Result<?> basicSendMessage(String queueName, Object param, String msg) {
|
||||
|
||||
log.info("【简单模型mq】 : method: 【 basicSendMessage 】 - ages: 【 String : {}, Object : {}, String : {} 】 ---> 【 消息发送中。。。 】", RabbitmqConstants.BASIC_QUEUE_NAME, param, msg);
|
||||
// 发送简单模型消息
|
||||
// 第一个参数: 绑定规则 相当于 队列名称
|
||||
// 第二个参数:消息内容
|
||||
rabbitTemplate.convertAndSend(queueName, param, message -> {
|
||||
message.getMessageProperties().setMessageId(UUID.randomUUID().toString());
|
||||
return message;
|
||||
} );
|
||||
|
||||
log.info("【简单模型mq】 : method: 【 basicSendMessage 】- queue: 【 {} 】 ---> 【 消息发送成功 】", RabbitmqConstants.BASIC_QUEUE_NAME);
|
||||
|
||||
return Result.success(msg!=null?msg:"消息发送成功");
|
||||
}
|
||||
|
||||
/**
|
||||
* Work queue 工作模型
|
||||
*
|
||||
* @param obj 传递的消息 (如果是对象需要序列化)
|
||||
* @return 结果集
|
||||
* 多个消费者,你一个我一个分配消费消息,有预取机制,默认公平消费,可配置 能者多劳模式(),谁完成的快,谁多做一点
|
||||
*/
|
||||
public Result<?> workSendMessage(String queueName, Object obj, String msg) {
|
||||
|
||||
log.info("【工作模型mq】 : method: 【 workSendMessage 】 - ages: 【 String : {}, Object : {}, String : {} 】 ---> 【 消息发送中。。。 】", queueName, obj, msg);
|
||||
// 发送简单模型消息
|
||||
// 第一个参数: 绑定规则 相当于 队列名称
|
||||
// 第二个参数:消息内容
|
||||
rabbitTemplate.convertAndSend(queueName, obj, message -> {
|
||||
message.getMessageProperties().setMessageId(UUID.randomUUID().toString());
|
||||
return message;
|
||||
} );
|
||||
|
||||
log.info("【工作模型mq】 : method: 【 workSendMessage 】- queue: 【 {} 】 ---> 【 消息发送成功 】", queueName);
|
||||
|
||||
return Result.success("消息发送成功");
|
||||
}
|
||||
|
||||
/**
|
||||
* Publish/Subscribe 发布订阅者模型
|
||||
* 多个消费者,多个消费者可以同时接收到消息 有交换机 类型 fanout
|
||||
*
|
||||
* @param exchange 交换机名称
|
||||
* @param obj 发送的消息Object
|
||||
* @param msg 响应的内容
|
||||
* @return 结果集
|
||||
*/
|
||||
public Result<?> publishSubscribeSendMessage(String exchange, Object obj, String msg) {
|
||||
|
||||
log.info("【订阅模型mq】 : method: 【 workSendMessage 】 - ages: 【 String : {}, Object : {}, String : {} 】 ---> 【 消息发送中。。。 】", exchange, obj, msg);
|
||||
// 发送简单模型消息
|
||||
// 第一个参数: exchange 交换机的名称
|
||||
// 第二个参数: 绑定规则 发布订阅者模型 不写 默认 "" 只要绑定就行 不需要规则
|
||||
// 第三个参数:消息内容
|
||||
rabbitTemplate.convertAndSend(exchange, "", obj, message -> {
|
||||
message.getMessageProperties().setMessageId(UUID.randomUUID().toString());
|
||||
return message;
|
||||
} );
|
||||
|
||||
log.info("【订阅模型mq】 : method: 【 workSendMessage 】- exchange: 【 {} 】 ---> 【 消息发送成功 】", exchange);
|
||||
|
||||
return Result.success("消息发送成功");
|
||||
}
|
||||
|
||||
/**
|
||||
* Routing路由模型
|
||||
* 使用的是 Direct 类型的交换机,会将接收到的消息根据 规则 路由到指定的Queue(队列),因此称为路由模式
|
||||
*
|
||||
* @param exchange 交换机名称
|
||||
* @param rule 绑定规则 一个字符串即可
|
||||
* @param obj 发送的消息Object
|
||||
* @param msg 响应的内容
|
||||
* @return 结果集
|
||||
*/
|
||||
public Result<?> routingSendMessage(String exchange, String rule, Object obj, String msg) {
|
||||
|
||||
log.info("【路由模型mq】 : method: 【 workSendMessage 】 - ages: 【 String : {}, Object : {}, String : {} 】 ---> 【 消息发送中。。。 】", exchange, obj, msg);
|
||||
// 发送简单模型消息
|
||||
// 第一个参数: 绑定规则 相当于 队列名称
|
||||
// 第二个参数:消息内容
|
||||
rabbitTemplate.convertAndSend(exchange, rule, obj, message -> {
|
||||
message.getMessageProperties().setMessageId(UUID.randomUUID().toString());
|
||||
return message;
|
||||
} );
|
||||
|
||||
log.info("【路由模型mq】 : method: 【 workSendMessage 】- exchange: 【 {} 】 ---> 【 消息发送成功 】", exchange);
|
||||
|
||||
return Result.success("消息发送成功");
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Topic主题模型模型
|
||||
* 使用的是 topic 类型的交换机
|
||||
*
|
||||
* @param exchange 交换机名称
|
||||
* @param rule 绑定规则 可以绑定多个单词以 . 拼接 也可以使用 #(匹配 零个 一个 或 多个 单词) 或 *(匹配 一个 单词) 通配符(例如:name.msg, *.msg, age.# )
|
||||
* @param obj 发送的消息Object
|
||||
* @param msg 响应的内容
|
||||
* @return 结果集
|
||||
*/
|
||||
public Result<?> topicSendMessage(String exchange, String rule, Object obj, String msg) {
|
||||
|
||||
log.info("【主题模型mq】 : method: 【 workSendMessage 】 - ages: 【 String : {}, Object : {}, String : {} 】 ---> 【 消息发送中。。。 】", exchange, obj, msg);
|
||||
// 发送简单模型消息
|
||||
// 第一个参数: 绑定规则 相当于 队列名称
|
||||
// 第二个参数:消息内容
|
||||
rabbitTemplate.convertAndSend(exchange, rule, obj, message -> {
|
||||
message.getMessageProperties().setMessageId(UUID.randomUUID().toString());
|
||||
return message;
|
||||
} );
|
||||
|
||||
log.info("【主题模型mq】 : method: 【 workSendMessage 】- exchange: 【 {} 】 ---> 【 消息发送成功 】", exchange);
|
||||
|
||||
return Result.success(obj,"消息发送成功");
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* 延迟队列模型
|
||||
* @param param 传输内容
|
||||
* @param delayTime 延迟时间
|
||||
* @return 结果集
|
||||
*/
|
||||
public Result<?> delayedSendMessage(Long delayTime, Object param) {
|
||||
log.info("【延迟队列模型】 : method: 【 delayedSendMessage 】 消息内容:{}---> 【 消息发送中。。。 】",param);
|
||||
|
||||
rabbitTemplate.convertAndSend(RabbitmqConstants.DELAYED_EXCHANGE_NAME, RabbitmqConstants.DELAYED_ROUTING_KEY,param, message -> {
|
||||
MessageProperties messageProperties = message.getMessageProperties();
|
||||
messageProperties.setMessageId(UUID.randomUUID().toString());
|
||||
messageProperties.setDelayLong(delayTime);
|
||||
return message;
|
||||
});
|
||||
log.info("【延迟队列模型】 : method: 【 delayedSendMessage 】 消息内容:{}---> 【 消息发送成功 】",param);
|
||||
|
||||
return Result.success(param,"消息发送成功");
|
||||
|
||||
}
|
||||
|
||||
}
|
|
@ -1 +1,7 @@
|
|||
com.muyu.common.rabbit.RabbitListenerConfigurer
|
||||
com.muyu.rabbitmq.producer.RabbitMQProducerUtil
|
||||
com.muyu.rabbitmq.consumer.RabbitMQConsumerUtil
|
||||
com.muyu.rabbitmq.config.RabbitmqConfig
|
||||
com.muyu.rabbitmq.config.MyConfirmCallback
|
||||
com.muyu.rabbitmq.config.DelayedQueueConfig
|
||||
com.muyu.rabbitmq.config.RabbitAdminConfig
|
||||
com.muyu.rabbitmq.config.ReturnCallbackConfig
|
||||
|
|
|
@ -22,6 +22,8 @@
|
|||
<module>cloud-common-rabbit</module>
|
||||
<module>cloud-common-cache</module>
|
||||
<module>cloud-common-kafka</module>
|
||||
<module>cloud-common-iotdb</module>
|
||||
<module>cloud-common-caffeine</module>
|
||||
</modules>
|
||||
|
||||
<artifactId>cloud-common</artifactId>
|
||||
|
|
|
@ -0,0 +1,39 @@
|
|||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<project xmlns="http://maven.apache.org/POM/4.0.0"
|
||||
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
<parent>
|
||||
<groupId>com.muyu</groupId>
|
||||
<artifactId>cloud-modules-data-process</artifactId>
|
||||
<version>3.6.3</version>
|
||||
</parent>
|
||||
|
||||
<artifactId>cloud-modules-data-process-common</artifactId>
|
||||
|
||||
<description>
|
||||
cloud-modules-data-process-common 数据处理公共模块
|
||||
</description>
|
||||
|
||||
<properties>
|
||||
<maven.compiler.source>17</maven.compiler.source>
|
||||
<maven.compiler.target>17</maven.compiler.target>
|
||||
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
|
||||
</properties>
|
||||
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>com.muyu</groupId>
|
||||
<artifactId>cloud-common-core</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.iotdb</groupId>
|
||||
<artifactId>iotdb-session</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>io.swagger.core.v3</groupId>
|
||||
<artifactId>swagger-annotations-jakarta</artifactId>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
|
||||
</project>
|
|
@ -0,0 +1,25 @@
|
|||
package com.muyu.data.basics;
|
||||
|
||||
/**
|
||||
* @Author WangXin
|
||||
* @Data 2024/9/29
|
||||
* @Description 事件队列
|
||||
* @Version 1.0.0
|
||||
*/
|
||||
public class EventHandler {
|
||||
|
||||
private static final ThreadLocal<EventQueueConfig> EVENT_THREAD = new ThreadLocal<>();
|
||||
|
||||
public static void set(final EventQueueConfig handler) {
|
||||
EVENT_THREAD.set(handler);
|
||||
}
|
||||
|
||||
public static EventQueueConfig get() {
|
||||
return EVENT_THREAD.get();
|
||||
}
|
||||
|
||||
public static void remove(){
|
||||
EVENT_THREAD.remove();
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,32 @@
|
|||
package com.muyu.data.basics;
|
||||
|
||||
import org.springframework.context.ApplicationEvent;
|
||||
|
||||
/**
|
||||
* @Author WangXin
|
||||
* @Data 2024/9/29
|
||||
* @Description 事件处理基础类
|
||||
* @Version 1.0.0
|
||||
*/
|
||||
public abstract class EventProcessBasics {
|
||||
|
||||
/**
|
||||
* 下一个事件对象
|
||||
*/
|
||||
protected EventProcessBasics nextEvent;
|
||||
|
||||
/**
|
||||
* 下一个事件
|
||||
* @param nextHandler 下一个事件处理
|
||||
*/
|
||||
public void setNextHandler(EventProcessBasics nextHandler) {
|
||||
this.nextEvent = nextHandler;
|
||||
}
|
||||
|
||||
/**
|
||||
* 事件处理抽象类
|
||||
* @param eventKey 事件唯一key
|
||||
*/
|
||||
public abstract void handleEvent(String eventKey);
|
||||
|
||||
}
|
|
@ -0,0 +1,35 @@
|
|||
package com.muyu.data.basics;
|
||||
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.Builder;
|
||||
import lombok.Data;
|
||||
import lombok.NoArgsConstructor;
|
||||
|
||||
import java.util.concurrent.LinkedBlockingDeque;
|
||||
|
||||
/**
|
||||
* @Author WangXin
|
||||
* @Data 2024/9/29
|
||||
* @Description 事件队列配置
|
||||
* @Version 1.0.0
|
||||
*/
|
||||
@Data
|
||||
@Builder
|
||||
@AllArgsConstructor
|
||||
@NoArgsConstructor
|
||||
public class EventQueueConfig {
|
||||
|
||||
private LinkedBlockingDeque<EventProcessBasics> taskNodeQueue = new LinkedBlockingDeque<>();
|
||||
|
||||
public void addEvent(EventProcessBasics obj){
|
||||
this.taskNodeQueue.add(obj);
|
||||
}
|
||||
|
||||
public boolean hashEventNext(){
|
||||
return !taskNodeQueue.isEmpty();
|
||||
}
|
||||
|
||||
private EventProcessBasics nextTaskNode(){
|
||||
return taskNodeQueue.poll();
|
||||
}
|
||||
}
|
|
@ -0,0 +1,26 @@
|
|||
package com.muyu.data.basics;
|
||||
|
||||
import com.muyu.data.domain.EventActuate;
|
||||
import org.springframework.context.ApplicationEvent;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* @Author WangXin
|
||||
* @Data 2024/9/29
|
||||
* @Description 事件启动类
|
||||
* @Version 1.0.0
|
||||
*/
|
||||
public class StartEvent extends ApplicationEvent {
|
||||
|
||||
private EventActuate eventActuate;
|
||||
|
||||
public StartEvent(EventActuate source) {
|
||||
super(source);
|
||||
this.eventActuate = source;
|
||||
}
|
||||
|
||||
public EventActuate getEventActuate() {
|
||||
return eventActuate;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,12 @@
|
|||
package com.muyu.data.constant;
|
||||
|
||||
/**
|
||||
* @Author WangXin
|
||||
* @Data 2024/9/29
|
||||
* @Description 事件常量
|
||||
* @Version 1.0.0
|
||||
*/
|
||||
public interface EventConstant {
|
||||
|
||||
String STORAGE_EVENT = "storageEvent";
|
||||
}
|
|
@ -0,0 +1,33 @@
|
|||
package com.muyu.data.domain;
|
||||
|
||||
import io.swagger.v3.oas.annotations.media.Schema;
|
||||
import io.swagger.v3.oas.annotations.tags.Tag;
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.Data;
|
||||
import lombok.NoArgsConstructor;
|
||||
import lombok.experimental.SuperBuilder;
|
||||
|
||||
/**
|
||||
* @Author WangXin
|
||||
* @Data 2024/9/30
|
||||
* @Description JSON数据对象
|
||||
* @Version 1.0.0
|
||||
*/
|
||||
@Data
|
||||
@SuperBuilder
|
||||
@AllArgsConstructor
|
||||
@NoArgsConstructor
|
||||
@Tag(name = "ionDB数据源对象")
|
||||
public class DataJSON {
|
||||
/**
|
||||
* 时间戳
|
||||
*/
|
||||
@Schema(name = "时间戳")
|
||||
private Long timestamp;
|
||||
|
||||
/**
|
||||
* 车辆JSON数据
|
||||
*/
|
||||
@Schema(name = "车辆JSON数据")
|
||||
private String datasource;
|
||||
}
|
|
@ -0,0 +1,29 @@
|
|||
package com.muyu.data.domain;
|
||||
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.Data;
|
||||
import lombok.NoArgsConstructor;
|
||||
import lombok.experimental.SuperBuilder;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* @Author WangXin
|
||||
* @Data 2024/9/29
|
||||
* @Description 事件驱动对象
|
||||
* @Version 1.0.0
|
||||
*/
|
||||
@Data
|
||||
@SuperBuilder
|
||||
@AllArgsConstructor
|
||||
@NoArgsConstructor
|
||||
public class EventActuate {
|
||||
/**
|
||||
* json数据
|
||||
*/
|
||||
private String jsonData;
|
||||
/**
|
||||
* 事件驱动key集合
|
||||
*/
|
||||
private List<String> eventKeys;
|
||||
}
|
|
@ -0,0 +1,23 @@
|
|||
package com.muyu.data.event;
|
||||
|
||||
import com.muyu.data.basics.StartEvent;
|
||||
import org.springframework.context.ApplicationListener;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
/**
|
||||
* @Author WangXin
|
||||
* @Data 2024/9/29
|
||||
* @Description 自启动事件监听器
|
||||
* @Version 1.0.0
|
||||
*/
|
||||
@Component
|
||||
public class AutoStartupEventListener implements ApplicationListener<StartEvent> {
|
||||
|
||||
@Override
|
||||
public void onApplicationEvent(StartEvent event) {
|
||||
|
||||
|
||||
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,37 @@
|
|||
package com.muyu.data.event;
|
||||
|
||||
import com.muyu.data.basics.EventProcessBasics;
|
||||
import com.muyu.data.constant.EventConstant;
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.Data;
|
||||
import lombok.EqualsAndHashCode;
|
||||
import lombok.extern.log4j.Log4j2;
|
||||
|
||||
/**
|
||||
* @Author WangXin
|
||||
* @Data 2024/9/29
|
||||
* @Description 存储事件
|
||||
* @Version 1.0.0
|
||||
*/
|
||||
@EqualsAndHashCode(callSuper = true)
|
||||
@Log4j2
|
||||
@Data
|
||||
@AllArgsConstructor
|
||||
public class StorageEvent extends EventProcessBasics {
|
||||
/**
|
||||
* 事件名称
|
||||
*/
|
||||
private String eventName;
|
||||
|
||||
@Override
|
||||
public void handleEvent(String eventKey) {
|
||||
if (eventKey.equals(eventName)){
|
||||
log.info("开始执行 [{}] 事件", eventKey);
|
||||
|
||||
}else if (nextEvent != null){
|
||||
nextEvent.handleEvent(eventKey);
|
||||
}else {
|
||||
log.info("处理结束,最后处理的事件为 [{}]", eventKey);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1 @@
|
|||
com.muyu.data.iotdb.config.IotDBSessionConfig
|
|
@ -0,0 +1,104 @@
|
|||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<project xmlns="http://maven.apache.org/POM/4.0.0"
|
||||
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
<parent>
|
||||
<groupId>com.muyu</groupId>
|
||||
<artifactId>cloud-modules-data-process</artifactId>
|
||||
<version>3.6.3</version>
|
||||
</parent>
|
||||
|
||||
<artifactId>cloud-modules-data-process-server</artifactId>
|
||||
|
||||
<properties>
|
||||
<maven.compiler.source>17</maven.compiler.source>
|
||||
<maven.compiler.target>17</maven.compiler.target>
|
||||
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
|
||||
</properties>
|
||||
|
||||
<description>
|
||||
cloud-modules-data-process-server 数据处理服务模块
|
||||
</description>
|
||||
|
||||
<dependencies>
|
||||
<!-- SpringCloud Alibaba Nacos -->
|
||||
<dependency>
|
||||
<groupId>com.alibaba.cloud</groupId>
|
||||
<artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
|
||||
</dependency>
|
||||
|
||||
<!-- SpringCloud Alibaba Nacos Config -->
|
||||
<dependency>
|
||||
<groupId>com.alibaba.cloud</groupId>
|
||||
<artifactId>spring-cloud-starter-alibaba-nacos-config</artifactId>
|
||||
</dependency>
|
||||
|
||||
<!-- SpringCloud Alibaba Sentinel -->
|
||||
<dependency>
|
||||
<groupId>com.alibaba.cloud</groupId>
|
||||
<artifactId>spring-cloud-starter-alibaba-sentinel</artifactId>
|
||||
</dependency>
|
||||
|
||||
<!-- SpringBoot Actuator -->
|
||||
<dependency>
|
||||
<groupId>org.springframework.boot</groupId>
|
||||
<artifactId>spring-boot-starter-actuator</artifactId>
|
||||
</dependency>
|
||||
|
||||
<!-- Mysql Connector -->
|
||||
<dependency>
|
||||
<groupId>com.mysql</groupId>
|
||||
<artifactId>mysql-connector-j</artifactId>
|
||||
</dependency>
|
||||
|
||||
<!-- MuYu Common DataSource -->
|
||||
<dependency>
|
||||
<groupId>com.muyu</groupId>
|
||||
<artifactId>cloud-common-datasource</artifactId>
|
||||
</dependency>
|
||||
|
||||
<!-- MuYu Common DataScope -->
|
||||
<dependency>
|
||||
<groupId>com.muyu</groupId>
|
||||
<artifactId>cloud-common-datascope</artifactId>
|
||||
</dependency>
|
||||
|
||||
<!-- MuYu Common Log -->
|
||||
<dependency>
|
||||
<groupId>com.muyu</groupId>
|
||||
<artifactId>cloud-common-log</artifactId>
|
||||
</dependency>
|
||||
|
||||
<!-- 接口模块 -->
|
||||
<dependency>
|
||||
<groupId>com.muyu</groupId>
|
||||
<artifactId>cloud-common-api-doc</artifactId>
|
||||
</dependency>
|
||||
|
||||
<!-- XllJob定时任务 -->
|
||||
<dependency>
|
||||
<groupId>com.muyu</groupId>
|
||||
<artifactId>cloud-common-xxl</artifactId>
|
||||
</dependency>
|
||||
|
||||
</dependencies>
|
||||
|
||||
<build>
|
||||
<finalName>${project.artifactId}</finalName>
|
||||
<plugins>
|
||||
<plugin>
|
||||
<groupId>org.springframework.boot</groupId>
|
||||
<artifactId>spring-boot-maven-plugin</artifactId>
|
||||
<executions>
|
||||
<execution>
|
||||
<goals>
|
||||
<goal>repackage</goal>
|
||||
</goals>
|
||||
</execution>
|
||||
</executions>
|
||||
</plugin>
|
||||
</plugins>
|
||||
</build>
|
||||
|
||||
</project>
|
|
@ -0,0 +1,14 @@
|
|||
package com.muyu.data;
|
||||
|
||||
import com.muyu.common.security.annotation.EnableCustomConfig;
|
||||
import com.muyu.common.security.annotation.EnableMyFeignClients;
|
||||
import org.springframework.boot.autoconfigure.SpringBootApplication;
|
||||
|
||||
@EnableCustomConfig
|
||||
@EnableMyFeignClients
|
||||
@SpringBootApplication
|
||||
public class DataProcessApplication {
|
||||
public static void main(String[] args) {
|
||||
System.out.println("Hello world!");
|
||||
}
|
||||
}
|
|
@ -0,0 +1,55 @@
|
|||
# Tomcat
|
||||
server:
|
||||
port: 9701
|
||||
|
||||
# nacos线上地址
|
||||
nacos:
|
||||
addr: 123.57.152.124:8848
|
||||
user-name: nacos
|
||||
password: nacos
|
||||
namespace: five
|
||||
# SPRING_AMQP_DESERIALIZATION_TRUST_ALL=true spring.amqp.deserialization.trust.all
|
||||
# Spring
|
||||
spring:
|
||||
amqp:
|
||||
deserialization:
|
||||
trust:
|
||||
all: true
|
||||
main:
|
||||
allow-bean-definition-overriding: true
|
||||
application:
|
||||
# 应用名称
|
||||
name: cloud-modules-data-process-server
|
||||
profiles:
|
||||
# 环境配置
|
||||
active: dev
|
||||
cloud:
|
||||
nacos:
|
||||
discovery:
|
||||
# 服务注册地址
|
||||
server-addr: ${nacos.addr}
|
||||
# nacos用户名
|
||||
username: ${nacos.user-name}
|
||||
# nacos密码
|
||||
password: ${nacos.password}
|
||||
# 命名空间
|
||||
namespace: ${nacos.namespace}
|
||||
config:
|
||||
# 服务注册地址
|
||||
server-addr: ${nacos.addr}
|
||||
# nacos用户名
|
||||
username: ${nacos.user-name}
|
||||
# nacos密码
|
||||
password: ${nacos.password}
|
||||
# 命名空间
|
||||
namespace: ${nacos.namespace}
|
||||
# 配置文件格式
|
||||
file-extension: yml
|
||||
# 共享配置
|
||||
shared-configs:
|
||||
# 系统共享配置
|
||||
- application-${spring.profiles.active}.${spring.cloud.nacos.config.file-extension}
|
||||
# 系统环境Config共享配置
|
||||
- application-config-${spring.profiles.active}.${spring.cloud.nacos.config.file-extension}
|
||||
# xxl-job 配置文件
|
||||
- application-xxl-config-${spring.profiles.active}.${spring.cloud.nacos.config.file-extension}
|
|
@ -0,0 +1,74 @@
|
|||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<configuration scan="true" scanPeriod="60 seconds" debug="false">
|
||||
<!-- 日志存放路径 -->
|
||||
<property name="log.path" value="logs/cloud-modules-data-process-server"/>
|
||||
<!-- 日志输出格式 -->
|
||||
<property name="log.pattern" value="%d{HH:mm:ss.SSS} [%thread] %-5level %logger{20} - [%method,%line] - %msg%n"/>
|
||||
|
||||
<!-- 控制台输出 -->
|
||||
<appender name="console" class="ch.qos.logback.core.ConsoleAppender">
|
||||
<encoder>
|
||||
<pattern>${log.pattern}</pattern>
|
||||
</encoder>
|
||||
</appender>
|
||||
|
||||
<!-- 系统日志输出 -->
|
||||
<appender name="file_info" class="ch.qos.logback.core.rolling.RollingFileAppender">
|
||||
<file>${log.path}/info.log</file>
|
||||
<!-- 循环政策:基于时间创建日志文件 -->
|
||||
<rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
|
||||
<!-- 日志文件名格式 -->
|
||||
<fileNamePattern>${log.path}/info.%d{yyyy-MM-dd}.log</fileNamePattern>
|
||||
<!-- 日志最大的历史 60天 -->
|
||||
<maxHistory>60</maxHistory>
|
||||
</rollingPolicy>
|
||||
<encoder>
|
||||
<pattern>${log.pattern}</pattern>
|
||||
</encoder>
|
||||
<filter class="ch.qos.logback.classic.filter.LevelFilter">
|
||||
<!-- 过滤的级别 -->
|
||||
<level>INFO</level>
|
||||
<!-- 匹配时的操作:接收(记录) -->
|
||||
<onMatch>ACCEPT</onMatch>
|
||||
<!-- 不匹配时的操作:拒绝(不记录) -->
|
||||
<onMismatch>DENY</onMismatch>
|
||||
</filter>
|
||||
</appender>
|
||||
|
||||
<appender name="file_error" class="ch.qos.logback.core.rolling.RollingFileAppender">
|
||||
<file>${log.path}/error.log</file>
|
||||
<!-- 循环政策:基于时间创建日志文件 -->
|
||||
<rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
|
||||
<!-- 日志文件名格式 -->
|
||||
<fileNamePattern>${log.path}/error.%d{yyyy-MM-dd}.log</fileNamePattern>
|
||||
<!-- 日志最大的历史 60天 -->
|
||||
<maxHistory>60</maxHistory>
|
||||
</rollingPolicy>
|
||||
<encoder>
|
||||
<pattern>${log.pattern}</pattern>
|
||||
</encoder>
|
||||
<filter class="ch.qos.logback.classic.filter.LevelFilter">
|
||||
<!-- 过滤的级别 -->
|
||||
<level>ERROR</level>
|
||||
<!-- 匹配时的操作:接收(记录) -->
|
||||
<onMatch>ACCEPT</onMatch>
|
||||
<!-- 不匹配时的操作:拒绝(不记录) -->
|
||||
<onMismatch>DENY</onMismatch>
|
||||
</filter>
|
||||
</appender>
|
||||
|
||||
<!-- 系统模块日志级别控制 -->
|
||||
<logger name="com.muyu" level="info"/>
|
||||
<!-- Spring日志级别控制 -->
|
||||
<logger name="org.springframework" level="warn"/>
|
||||
|
||||
<root level="info">
|
||||
<appender-ref ref="console"/>
|
||||
</root>
|
||||
|
||||
<!--系统操作日志-->
|
||||
<root level="info">
|
||||
<appender-ref ref="file_info"/>
|
||||
<appender-ref ref="file_error"/>
|
||||
</root>
|
||||
</configuration>
|
|
@ -0,0 +1,81 @@
|
|||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<configuration scan="true" scanPeriod="60 seconds" debug="false">
|
||||
<!-- 日志存放路径 -->
|
||||
<property name="log.path" value="logs/cloud-modules-data-process-server"/>
|
||||
<!-- 日志输出格式 -->
|
||||
<property name="log.pattern" value="%d{HH:mm:ss.SSS} [%thread] %-5level %logger{20} - [%method,%line] - %msg%n"/>
|
||||
<property name="log.sky.pattern" value="%d{HH:mm:ss.SSS} %yellow([%tid]) [%thread] %-5level %logger{20} - [%method,%line] - %msg%n"/>
|
||||
|
||||
<!-- 控制台输出 -->
|
||||
<appender name="console" class="ch.qos.logback.core.ConsoleAppender">
|
||||
<encoder>
|
||||
<pattern>${log.sky.pattern}</pattern>
|
||||
</encoder>
|
||||
</appender>
|
||||
|
||||
<!-- 系统日志输出 -->
|
||||
<appender name="file_info" class="ch.qos.logback.core.rolling.RollingFileAppender">
|
||||
<file>${log.path}/info.log</file>
|
||||
<!-- 循环政策:基于时间创建日志文件 -->
|
||||
<rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
|
||||
<!-- 日志文件名格式 -->
|
||||
<fileNamePattern>${log.path}/info.%d{yyyy-MM-dd}.log</fileNamePattern>
|
||||
<!-- 日志最大的历史 60天 -->
|
||||
<maxHistory>60</maxHistory>
|
||||
</rollingPolicy>
|
||||
|
||||
<filter class="ch.qos.logback.classic.filter.LevelFilter">
|
||||
<!-- 过滤的级别 -->
|
||||
<level>INFO</level>
|
||||
<!-- 匹配时的操作:接收(记录) -->
|
||||
<onMatch>ACCEPT</onMatch>
|
||||
<!-- 不匹配时的操作:拒绝(不记录) -->
|
||||
<onMismatch>DENY</onMismatch>
|
||||
</filter>
|
||||
</appender>
|
||||
|
||||
<appender name="file_error" class="ch.qos.logback.core.rolling.RollingFileAppender">
|
||||
<file>${log.path}/error.log</file>
|
||||
<!-- 循环政策:基于时间创建日志文件 -->
|
||||
<rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
|
||||
<!-- 日志文件名格式 -->
|
||||
<fileNamePattern>${log.path}/error.%d{yyyy-MM-dd}.log</fileNamePattern>
|
||||
<!-- 日志最大的历史 60天 -->
|
||||
<maxHistory>60</maxHistory>
|
||||
</rollingPolicy>
|
||||
|
||||
<filter class="ch.qos.logback.classic.filter.LevelFilter">
|
||||
<!-- 过滤的级别 -->
|
||||
<level>ERROR</level>
|
||||
<!-- 匹配时的操作:接收(记录) -->
|
||||
<onMatch>ACCEPT</onMatch>
|
||||
<!-- 不匹配时的操作:拒绝(不记录) -->
|
||||
<onMismatch>DENY</onMismatch>
|
||||
</filter>
|
||||
</appender>
|
||||
|
||||
<!-- 使用gRpc将日志发送到skywalking服务端 -->
|
||||
<appender name="GRPC_LOG" class="org.apache.skywalking.apm.toolkit.log.logback.v1.x.log.GRPCLogClientAppender">
|
||||
<encoder class="ch.qos.logback.core.encoder.LayoutWrappingEncoder">
|
||||
<layout class="org.apache.skywalking.apm.toolkit.log.logback.v1.x.TraceIdPatternLogbackLayout">
|
||||
<Pattern>${log.sky.pattern}</Pattern>
|
||||
</layout>
|
||||
</encoder>
|
||||
</appender>
|
||||
|
||||
<!-- 系统模块日志级别控制 -->
|
||||
<logger name="com.muyu" level="info"/>
|
||||
<!-- Spring日志级别控制 -->
|
||||
<logger name="org.springframework" level="warn"/>
|
||||
|
||||
<root level="info">
|
||||
<appender-ref ref="GRPC_LOG"/>
|
||||
<appender-ref ref="console"/>
|
||||
</root>
|
||||
|
||||
<!--系统操作日志-->
|
||||
<root level="info">
|
||||
<appender-ref ref="file_info"/>
|
||||
<appender-ref ref="file_error"/>
|
||||
</root>
|
||||
</configuration>
|
|
@ -0,0 +1,81 @@
|
|||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<configuration scan="true" scanPeriod="60 seconds" debug="false">
|
||||
<!-- 日志存放路径 -->
|
||||
<property name="log.path" value="logs/cloud-modules-data-process-server"/>
|
||||
<!-- 日志输出格式 -->
|
||||
<property name="log.pattern" value="%d{HH:mm:ss.SSS} [%thread] %-5level %logger{20} - [%method,%line] - %msg%n"/>
|
||||
<property name="log.sky.pattern" value="%d{HH:mm:ss.SSS} %yellow([%tid]) [%thread] %-5level %logger{20} - [%method,%line] - %msg%n"/>
|
||||
|
||||
<!-- 控制台输出 -->
|
||||
<appender name="console" class="ch.qos.logback.core.ConsoleAppender">
|
||||
<encoder>
|
||||
<pattern>${log.sky.pattern}</pattern>
|
||||
</encoder>
|
||||
</appender>
|
||||
|
||||
<!-- 系统日志输出 -->
|
||||
<appender name="file_info" class="ch.qos.logback.core.rolling.RollingFileAppender">
|
||||
<file>${log.path}/info.log</file>
|
||||
<!-- 循环政策:基于时间创建日志文件 -->
|
||||
<rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
|
||||
<!-- 日志文件名格式 -->
|
||||
<fileNamePattern>${log.path}/info.%d{yyyy-MM-dd}.log</fileNamePattern>
|
||||
<!-- 日志最大的历史 60天 -->
|
||||
<maxHistory>60</maxHistory>
|
||||
</rollingPolicy>
|
||||
|
||||
<filter class="ch.qos.logback.classic.filter.LevelFilter">
|
||||
<!-- 过滤的级别 -->
|
||||
<level>INFO</level>
|
||||
<!-- 匹配时的操作:接收(记录) -->
|
||||
<onMatch>ACCEPT</onMatch>
|
||||
<!-- 不匹配时的操作:拒绝(不记录) -->
|
||||
<onMismatch>DENY</onMismatch>
|
||||
</filter>
|
||||
</appender>
|
||||
|
||||
<appender name="file_error" class="ch.qos.logback.core.rolling.RollingFileAppender">
|
||||
<file>${log.path}/error.log</file>
|
||||
<!-- 循环政策:基于时间创建日志文件 -->
|
||||
<rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
|
||||
<!-- 日志文件名格式 -->
|
||||
<fileNamePattern>${log.path}/error.%d{yyyy-MM-dd}.log</fileNamePattern>
|
||||
<!-- 日志最大的历史 60天 -->
|
||||
<maxHistory>60</maxHistory>
|
||||
</rollingPolicy>
|
||||
|
||||
<filter class="ch.qos.logback.classic.filter.LevelFilter">
|
||||
<!-- 过滤的级别 -->
|
||||
<level>ERROR</level>
|
||||
<!-- 匹配时的操作:接收(记录) -->
|
||||
<onMatch>ACCEPT</onMatch>
|
||||
<!-- 不匹配时的操作:拒绝(不记录) -->
|
||||
<onMismatch>DENY</onMismatch>
|
||||
</filter>
|
||||
</appender>
|
||||
|
||||
<!-- 使用gRpc将日志发送到skywalking服务端 -->
|
||||
<appender name="GRPC_LOG" class="org.apache.skywalking.apm.toolkit.log.logback.v1.x.log.GRPCLogClientAppender">
|
||||
<encoder class="ch.qos.logback.core.encoder.LayoutWrappingEncoder">
|
||||
<layout class="org.apache.skywalking.apm.toolkit.log.logback.v1.x.TraceIdPatternLogbackLayout">
|
||||
<Pattern>${log.sky.pattern}</Pattern>
|
||||
</layout>
|
||||
</encoder>
|
||||
</appender>
|
||||
|
||||
<!-- 系统模块日志级别控制 -->
|
||||
<logger name="com.muyu" level="info"/>
|
||||
<!-- Spring日志级别控制 -->
|
||||
<logger name="org.springframework" level="warn"/>
|
||||
|
||||
<root level="info">
|
||||
<appender-ref ref="GRPC_LOG"/>
|
||||
<appender-ref ref="console"/>
|
||||
</root>
|
||||
|
||||
<!--系统操作日志-->
|
||||
<root level="info">
|
||||
<appender-ref ref="file_info"/>
|
||||
<appender-ref ref="file_error"/>
|
||||
</root>
|
||||
</configuration>
|
|
@ -0,0 +1,29 @@
|
|||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<project xmlns="http://maven.apache.org/POM/4.0.0"
|
||||
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
<parent>
|
||||
<groupId>com.muyu</groupId>
|
||||
<artifactId>cloud-modules</artifactId>
|
||||
<version>3.6.3</version>
|
||||
</parent>
|
||||
|
||||
<artifactId>cloud-modules-data-process</artifactId>
|
||||
<packaging>pom</packaging>
|
||||
|
||||
<description>
|
||||
cloud-modules-data-process数据处理模块
|
||||
</description>
|
||||
<modules>
|
||||
<module>cloud-modules-data-process-server</module>
|
||||
<module>cloud-modules-data-process-common</module>
|
||||
</modules>
|
||||
|
||||
<properties>
|
||||
<maven.compiler.source>17</maven.compiler.source>
|
||||
<maven.compiler.target>17</maven.compiler.target>
|
||||
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
|
||||
</properties>
|
||||
|
||||
</project>
|
|
@ -9,10 +9,10 @@
|
|||
<version>3.6.3</version>
|
||||
</parent>
|
||||
|
||||
<artifactId>cloud-modules-vehicleGateway</artifactId>
|
||||
<artifactId>cloud-modules-vehicle-gateway</artifactId>
|
||||
|
||||
<description>
|
||||
cloud-modules-vehicleGateway车辆网关模块
|
||||
cloud-modules-vehicleGateway 车辆网关模块
|
||||
</description>
|
||||
|
||||
<properties>
|
|
@ -0,0 +1,2 @@
|
|||
Spring Boot Version: ${spring-boot.version}
|
||||
Spring Application Name: ${spring.application.name}
|
|
@ -15,6 +15,8 @@
|
|||
<module>cloud-vx</module>
|
||||
<module>cloud-modules-parse</module>
|
||||
<module>cloud-modules-enterprise</module>
|
||||
<!-- <module>cloud-modules-vehicle-gateway</module>-->
|
||||
<module>cloud-modules-data-process</module>
|
||||
</modules>
|
||||
|
||||
<artifactId>cloud-modules</artifactId>
|
||||
|
|
21
pom.xml
21
pom.xml
|
@ -43,8 +43,10 @@
|
|||
<hutool.version>5.8.27</hutool.version>
|
||||
<knife4j-openapi3.version>4.1.0</knife4j-openapi3.version>
|
||||
<xxl-job-core.version>2.4.1</xxl-job-core.version>
|
||||
<iotdb.version>1.3.1</iotdb.version>
|
||||
<swagger.annotations.version>2.2.8</swagger.annotations.version>
|
||||
<caffeine.version>2.9.3</caffeine.version>
|
||||
<mqtt.version>1.2.5</mqtt.version>
|
||||
<mybits-plus-business>4.1.65.Final</mybits-plus-business>
|
||||
</properties>
|
||||
|
||||
<!-- 依赖声明 -->
|
||||
|
@ -293,13 +295,17 @@
|
|||
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
|
||||
<version>${mqtt.version}</version>
|
||||
</dependency>
|
||||
|
||||
|
||||
<!-- mybits-plus-business 业务依赖 -->
|
||||
<!-- iotdb -->
|
||||
<dependency>
|
||||
<groupId>io.netty</groupId>
|
||||
<artifactId>netty-all</artifactId>
|
||||
<version>${mybits-plus-business}</version>
|
||||
<groupId>org.apache.iotdb</groupId>
|
||||
<artifactId>iotdb-session</artifactId>
|
||||
<version>${iotdb.version}</version>
|
||||
</dependency>
|
||||
<!-- caffeine -->
|
||||
<dependency>
|
||||
<groupId>com.github.ben-manes.caffeine</groupId>
|
||||
<artifactId>caffeine</artifactId>
|
||||
<version>${caffeine.version}</version>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
</dependencyManagement>
|
||||
|
@ -326,6 +332,7 @@
|
|||
<plugin>
|
||||
<groupId>org.apache.maven.plugins</groupId>
|
||||
<artifactId>maven-compiler-plugin</artifactId>
|
||||
<version>3.13.0</version>
|
||||
<configuration>
|
||||
<source>${java.version}</source>
|
||||
<target>${java.version}</target>
|
||||
|
|
Loading…
Reference in New Issue