feat()解析并添加到iotDB数据库

dev
20300 2024-06-14 19:01:37 +08:00
parent 9c72d245a9
commit 84a91d275c
21 changed files with 1206 additions and 13 deletions

26
pom.xml
View File

@ -14,6 +14,32 @@
<spring-boot.version>2.6.13</spring-boot.version> <spring-boot.version>2.6.13</spring-boot.version>
</properties> </properties>
<dependencies> <dependencies>
<!-- iotDB-->
<dependency>
<groupId>org.apache.iotdb</groupId>
<artifactId>iotdb-session</artifactId>
<version>0.14.0-preview1</version>
</dependency>
<!-- redis-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
<exclusions>
<exclusion>
<groupId>io.lettuce</groupId>
<artifactId>lettuce-core</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
</dependency>
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>3.16.0</version>
</dependency>
<dependency> <dependency>
<groupId>org.bouncycastle</groupId> <groupId>org.bouncycastle</groupId>
<artifactId>bcpkix-jdk15on</artifactId> <artifactId>bcpkix-jdk15on</artifactId>

View File

@ -0,0 +1,36 @@
//package com.hyc.config;
//
///**
// * kafka初始化配置类
// *
// * @author YouChe·He
// * @ClassName: KafkaConfig
// * @Description: kafka初始化配置类
// * @CreateTime: 2024/6/10 10:30
// */
//import org.apache.kafka.clients.admin.AdminClient;
//import org.apache.kafka.clients.admin.AdminClientConfig;
//import org.apache.kafka.clients.admin.NewTopic;
//import org.springframework.context.annotation.Bean;
//import org.springframework.context.annotation.Configuration;
//
//import java.util.Properties;
//
//@Configuration
//public class KafkaConfig {
//
//
//
// @Bean
// public AdminClient adminClient() {
// Properties properties = new Properties();
// properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "47.103.75.98:9092");
// return AdminClient.create(properties);
// }
//
// @Bean
// public NewTopic createTopic() {
// return new NewTopic("topichyc", 8, (short) 1);
// }
//}
//

View File

@ -0,0 +1,33 @@
package com.hyc.config;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;
/**
* redsi
*
* @author YouChe·He
* @ClassName: RedisConfoig
* @Description: redsi
* @CreateTime: 2024/5/27 14:16
*/
@Configuration
public class RedisConfig {
@Bean
public RedisTemplate<String,String> redisTemplate(RedisConnectionFactory redisConnectionFactory){
RedisTemplate<String, String> redisTemplate = new RedisTemplate<>();
redisTemplate.setConnectionFactory(redisConnectionFactory);
redisTemplate.setKeySerializer(new StringRedisSerializer());
redisTemplate.setValueSerializer(new Jackson2JsonRedisSerializer<Object>(Object.class));
redisTemplate.setHashKeySerializer(new StringRedisSerializer());
redisTemplate.setHashValueSerializer(new StringRedisSerializer());
return redisTemplate;
}
}

View File

@ -124,8 +124,16 @@ public class SummarizeController {
HashMap<String, String> stringStringHashMap = new HashMap<>(); HashMap<String, String> stringStringHashMap = new HashMap<>();
for (int i = 0; i < 47; i++) { for (int i = 0; i < 47; i++) {
String substring = realString.substring(count, count + intArr[i]); String substring = realString.substring(count, count + intArr[i]);
linkedHashMap.put(strArr[i],substring);
if (strArr[i]=="vin" || strArr[i] == "gear"){
linkedHashMap.put(strArr[i],substring);
}else if (strArr[i] == "startTime") {
linkedHashMap.put(strArr[i],Long.valueOf(substring));
} else {
linkedHashMap.put(strArr[i],Double.valueOf(substring));
}
count = count + intArr[i]; count = count + intArr[i];
} }
log.warn("hashMap:{}",linkedHashMap); log.warn("hashMap:{}",linkedHashMap);
ObjectMapper objectMapper = new ObjectMapper(); ObjectMapper objectMapper = new ObjectMapper();
@ -134,7 +142,7 @@ public class SummarizeController {
log.error("json格式:{}",jsonData); log.error("json格式:{}",jsonData);
transactionTemplate.execute(status -> { transactionTemplate.execute(status -> {
try { try {
kafkaTemplate.send("topic1", UUID.randomUUID().toString(), jsonData); kafkaTemplate.send("topichyc", UUID.randomUUID().toString(), jsonData);
status.flush(); status.flush();
}catch (Exception e){ }catch (Exception e){
log.error("kafka生产异常: {}", e.getMessage()); log.error("kafka生产异常: {}", e.getMessage());

View File

@ -0,0 +1,33 @@
package com.hyc.domain;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.experimental.SuperBuilder;
/**
*
*
* @author YouChe·He
* @ClassName: CarIncidentHandle
* @Description:
* @CreateTime: 2024/6/10 10:25
*/
@Data
@SuperBuilder
@NoArgsConstructor
@AllArgsConstructor
public class CarIncidentHandle {
/**
* id
*/
private Double id;
/**
* VIN
*/
private String vin;
/**
* 1:2:
*/
private String carIncident;
}

View File

@ -44,6 +44,6 @@ public class TimeSortCar implements Comparable<TimeSortCar> {
@Override @Override
public int compareTo(TimeSortCar o) { public int compareTo(TimeSortCar o) {
return Integer.compare((int) o.connectDuration,(int) this.connectDuration); return Double.compare((int) o.connectDuration,(int) this.connectDuration);
} }
} }

View File

@ -0,0 +1,186 @@
package com.hyc.iotdbdemo.config;
import lombok.extern.log4j.Log4j2;
import org.apache.iotdb.rpc.IoTDBConnectionException;
import org.apache.iotdb.rpc.StatementExecutionException;
import org.apache.iotdb.session.Session;
import org.apache.iotdb.session.SessionDataSet;
import org.apache.iotdb.session.util.Version;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.write.record.Tablet;
import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.stereotype.Component;
import java.rmi.ServerException;
import java.util.ArrayList;
import java.util.List;
/**
* description: iotdb
* root.a1eaKSRpRty.CA3013A303A25467
* root.a1eaKSRpRty.CA3013A303A25467.heart root.a1eaKSRpRty
* author: zhouhong
*/
@Log4j2
@Component
@Configuration
public class IotDBSessionConfig {
private static Session session;
private static final String LOCAL_HOST = "47.103.75.98";
@Bean
public Session getSession() throws IoTDBConnectionException, StatementExecutionException {
if (session == null) {
log.info("正在连接iotdb.......");
session = new Session.Builder().host(LOCAL_HOST).port(6667).username("root").password("root").version(Version.V_0_13).build();
session.open(false);
session.setFetchSize(100);
log.info("iotdb连接成功~");
// 设置时区
session.setTimeZone("+08:00");
}
return session;
}
/**
* description: - insertRecord
* author: zhouhong
* @param * @param deviceId:root.a1eaKSRpRty.CA3013A303A25467
* time:
* measurementsList
* type BOOLEAN((byte)0), INT32((byte)1),INT64((byte)2),FLOAT((byte)3),DOUBLE((byte)4),TEXT((byte)5),VECTOR((byte)6);
* valuesList ---
* @return
*/
public void insertRecordType(String deviceId, Long time,List<String> measurementsList, TSDataType type,List<Object> valuesList) throws StatementExecutionException, IoTDBConnectionException, ServerException {
if (measurementsList.size() != valuesList.size()) {
throw new ServerException("measurementsList 与 valuesList 值不对应");
}
List<TSDataType> types = new ArrayList<>();
measurementsList.forEach(item -> {
types.add(type);
});
session.insertRecord(deviceId, time, measurementsList, types, valuesList);
}
/**
* description: - insertRecord
* author: zhouhong
* @param deviceId:root.a1eaKSRpRty.CA3013A303A25467
* @param time:
* @param measurementsList
* @param valuesList ---
* @return
*/
public void insertRecord(String deviceId, Long time,List<String> measurementsList, List<String> valuesList) throws StatementExecutionException, IoTDBConnectionException, ServerException {
if (measurementsList.size() == valuesList.size()) {
session.insertRecord(deviceId, time, measurementsList, valuesList);
} else {
log.error("measurementsList 与 valuesList 值不对应");
}
}
/**
* description:
* author: zhouhong
*/
public void insertRecords(List<String> deviceIdList, List<Long> timeList, List<List<String>> measurementsList, List<List<String>> valuesList) throws StatementExecutionException, IoTDBConnectionException, ServerException {
if (measurementsList.size() == valuesList.size()) {
session.insertRecords(deviceIdList, timeList, measurementsList, valuesList);
} else {
log.error("measurementsList 与 valuesList 值不对应");
}
}
/**
* description:
* author: zhouhong
* @param deviceId:root.a1eaKSRpRty.CA3013A303A25467
* @param time:
* @param schemaList: + List<MeasurementSchema> schemaList = new ArrayList<>(); schemaList.add(new MeasurementSchema("breath", TSDataType.INT64));
* @param maxRowNumber
* @return
*/
public void insertTablet(String deviceId, Long time,List<MeasurementSchema> schemaList, List<Object> valueList,int maxRowNumber) throws StatementExecutionException, IoTDBConnectionException {
Tablet tablet = new Tablet(deviceId, schemaList, maxRowNumber);
// 向iotdb里面添加数据
int rowIndex = tablet.rowSize++;
tablet.addTimestamp(rowIndex, time);
for (int i = 0; i < valueList.size(); i++) {
tablet.addValue(schemaList.get(i).getMeasurementId(), rowIndex, valueList.get(i));
}
if (tablet.rowSize == tablet.getMaxRowNumber()) {
session.insertTablet(tablet, true);
tablet.reset();
}
if (tablet.rowSize != 0) {
session.insertTablet(tablet);
tablet.reset();
}
}
/**
* description: SQL
* author: zhouhong
*/
public SessionDataSet query(String sql) throws StatementExecutionException, IoTDBConnectionException {
return session.executeQueryStatement(sql);
}
/**
* description: root.a1eaKSRpRty
* author: zhouhong
* @param groupName
* @return
*/
public void deleteStorageGroup(String groupName) throws StatementExecutionException, IoTDBConnectionException {
session.deleteStorageGroup(groupName);
}
/**
* description: Timeseries root.a1eaKSRpRty.CA3013A303A25467.breath
* author: zhouhong
*/
public void deleteTimeseries(String timeseries) throws StatementExecutionException, IoTDBConnectionException {
session.deleteTimeseries(timeseries);
}
/**
* description: Timeseries
* author: zhouhong
*/
public void deleteTimeserieList(List<String> timeseriesList) throws StatementExecutionException, IoTDBConnectionException {
session.deleteTimeseries(timeseriesList);
}
/**
* description:
* author: zhouhong
*/
public void deleteStorageGroupList(List<String> storageGroupList) throws StatementExecutionException, IoTDBConnectionException {
session.deleteStorageGroups(storageGroupList);
}
/**
* description:
* author: zhouhong
*/
public void deleteDataByPathAndEndTime(String path, Long endTime) throws StatementExecutionException, IoTDBConnectionException {
session.deleteData(path, endTime);
}
/**
* description:
* author: zhouhong
*/
public void deleteDataByPathListAndEndTime(List<String> pathList, Long endTime) throws StatementExecutionException, IoTDBConnectionException {
session.deleteData(pathList, endTime);
}
/**
* description:
* author: zhouhong
*/
public void deleteDataByPathListAndTime(List<String> pathList, Long startTime,Long endTime) throws StatementExecutionException, IoTDBConnectionException {
session.deleteData(pathList, startTime, endTime);
}
}

View File

@ -0,0 +1,63 @@
package com.hyc.iotdbdemo.controller;
import com.hyc.domain.VehicleData;
import com.hyc.iotdbdemo.config.IotDBSessionConfig;
import com.hyc.iotdbdemo.model.param.IotDbParam;
import com.hyc.iotdbdemo.response.ResponseData;
import com.hyc.iotdbdemo.server.IotDbServer;
import lombok.extern.log4j.Log4j2;
import org.apache.iotdb.rpc.IoTDBConnectionException;
import org.apache.iotdb.rpc.StatementExecutionException;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;
import java.rmi.ServerException;
/**
* description: iotdb
* date: 2022/8/15 21:50
* author: zhouhong
*/
@Log4j2
@RestController
public class IotDbController {
@Resource
private IotDbServer iotDbServer;
@Resource
private IotDBSessionConfig iotDBSessionConfig;
/**
*
* @param iotDbParam
*/
@PostMapping("/api/device/insert")
public ResponseData insert(@RequestBody IotDbParam iotDbParam) throws StatementExecutionException, ServerException, IoTDBConnectionException {
//iotDbServer.insertData(iotDbParam);
return ResponseData.success();
}
/**
*
* @param iotDbParam
*/
@PostMapping("/api/device/queryData")
public ResponseData queryDataFromIotDb(@RequestBody IotDbParam vehicleData) throws Exception {
return ResponseData.success(iotDbServer.queryDataFromIotDb(vehicleData));
}
/**
*
* @return
*/
@PostMapping("/api/device/deleteGroup")
public ResponseData deleteGroup() throws StatementExecutionException, IoTDBConnectionException {
iotDBSessionConfig.deleteStorageGroup("root.a1eaKSRpRty");
iotDBSessionConfig.deleteStorageGroup("root.smartretirement");
return ResponseData.success();
}
}

View File

@ -0,0 +1,25 @@
package com.hyc.iotdbdemo.model.param;
import lombok.Data;
/**
* description:
* date: 2022/8/15 21:53
* author: zhouhong
*/
@Data
public class IotDbParam {
/***
*
*/
private String vin;
/***
*
*/
private String startTime;
/***
*
*/
private String endTime;
}

View File

@ -0,0 +1,33 @@
package com.hyc.iotdbdemo.model.result;
import lombok.Data;
/**
* description:
* date: 2022/8/15 21:56
* author: zhouhong
*/
@Data
public class IotDbResult {
/***
*
*/
private String time;
/***
* PK
*/
private String pk;
/***
*
*/
private String sn;
/***
*
*/
private String breath;
/***
*
*/
private String heart;
}

View File

@ -0,0 +1,81 @@
package com.hyc.iotdbdemo.response;
/**
* description:
* date: 2022/8/15 21:30
* author: zhouhong
*/
public class ErrorResponseData extends ResponseData {
private String exceptionClazz;
ErrorResponseData(String message) {
super(false, DEFAULT_ERROR_CODE, message, message, (Object)null);
}
public ErrorResponseData(Integer code, String message) {
super(false, code, message, message, (Object)null);
}
ErrorResponseData(Integer code, String message, Object object) {
super(false, code, message, object);
}
ErrorResponseData(Integer code, String message, String localizedMsg, Object object) {
super(false, code, message, localizedMsg, object);
}
@Override
public boolean equals(final Object o) {
if (o == this) {
return true;
} else if (!(o instanceof ErrorResponseData)) {
return false;
} else {
ErrorResponseData other = (ErrorResponseData)o;
if (!other.canEqual(this)) {
return false;
} else if (!super.equals(o)) {
return false;
} else {
Object this$exceptionClazz = this.getExceptionClazz();
Object other$exceptionClazz = other.getExceptionClazz();
if (this$exceptionClazz == null) {
if (other$exceptionClazz != null) {
return false;
}
} else if (!this$exceptionClazz.equals(other$exceptionClazz)) {
return false;
}
return true;
}
}
}
@Override
protected boolean canEqual(final Object other) {
return other instanceof ErrorResponseData;
}
@Override
public int hashCode() {
int result = super.hashCode();
Object $exceptionClazz = this.getExceptionClazz();
result = result * 59 + ($exceptionClazz == null ? 43 : $exceptionClazz.hashCode());
return result;
}
public String getExceptionClazz() {
return this.exceptionClazz;
}
public void setExceptionClazz(final String exceptionClazz) {
this.exceptionClazz = exceptionClazz;
}
@Override
public String toString() {
return "ErrorResponseData(exceptionClazz=" + this.getExceptionClazz() + ")";
}
}

View File

@ -0,0 +1,216 @@
package com.hyc.iotdbdemo.response;
/**
* description:
* date: 2022/8/15 21:32
* author: zhouhong
*/
public class ResponseData {
public static final String DEFAULT_SUCCESS_MESSAGE = "请求成功";
public static final String DEFAULT_ERROR_MESSAGE = "网络异常";
public static final Integer DEFAULT_SUCCESS_CODE = 200;
public static final Integer DEFAULT_ERROR_CODE = 500;
private Boolean success;
private Integer code;
private String message;
private String localizedMsg;
private Object data;
public ResponseData() {
}
public ResponseData(Boolean success, Integer code, String message, Object data) {
this.success = success;
this.code = code;
this.message = message;
this.data = data;
}
public ResponseData(Boolean success, Integer code, String message, String localizedMsg, Object data) {
this.success = success;
this.code = code;
this.message = message;
this.localizedMsg = localizedMsg;
this.data = data;
}
public ResponseData(Boolean success, Integer code, String message) {
this.success = success;
this.code = code;
this.message = message;
}
public static SuccessResponseData success() {
return new SuccessResponseData();
}
public static SuccessResponseData success(Object object) {
return new SuccessResponseData(object);
}
public static SuccessResponseData success(Integer code, String message, Object object) {
return new SuccessResponseData(code, message, object);
}
public static SuccessResponseData success(Integer code, String message) {
return new SuccessResponseData(code, message);
}
public static SuccessResponseData success(Integer code, String message, String localizedMsg, Object object) {
return new SuccessResponseData(code, message, localizedMsg, object);
}
public static ErrorResponseData error(String message) {
return new ErrorResponseData(message);
}
public static ErrorResponseData error(Integer code, String message) {
return new ErrorResponseData(code, message);
}
public static ErrorResponseData error(Integer code, String message, Object object) {
return new ErrorResponseData(code, message, object);
}
public static ErrorResponseData error(Integer code, String message, String localizedMsg, Object object) {
return new ErrorResponseData(code, message, localizedMsg, object);
}
public Boolean getSuccess() {
return this.success;
}
public Integer getCode() {
return this.code;
}
public String getMessage() {
return this.message;
}
public String getLocalizedMsg() {
return this.localizedMsg;
}
public Object getData() {
return this.data;
}
public void setSuccess(final Boolean success) {
this.success = success;
}
public void setCode(final Integer code) {
this.code = code;
}
public void setMessage(final String message) {
this.message = message;
}
public void setLocalizedMsg(final String localizedMsg) {
this.localizedMsg = localizedMsg;
}
public void setData(final Object data) {
this.data = data;
}
@Override
public boolean equals(final Object o) {
if (o == this) {
return true;
} else if (!(o instanceof ResponseData)) {
return false;
} else {
ResponseData other = (ResponseData)o;
if (!other.canEqual(this)) {
return false;
} else {
label71: {
Object this$success = this.getSuccess();
Object other$success = other.getSuccess();
if (this$success == null) {
if (other$success == null) {
break label71;
}
} else if (this$success.equals(other$success)) {
break label71;
}
return false;
}
Object this$code = this.getCode();
Object other$code = other.getCode();
if (this$code == null) {
if (other$code != null) {
return false;
}
} else if (!this$code.equals(other$code)) {
return false;
}
label57: {
Object this$message = this.getMessage();
Object other$message = other.getMessage();
if (this$message == null) {
if (other$message == null) {
break label57;
}
} else if (this$message.equals(other$message)) {
break label57;
}
return false;
}
Object this$localizedMsg = this.getLocalizedMsg();
Object other$localizedMsg = other.getLocalizedMsg();
if (this$localizedMsg == null) {
if (other$localizedMsg != null) {
return false;
}
} else if (!this$localizedMsg.equals(other$localizedMsg)) {
return false;
}
Object this$data = this.getData();
Object other$data = other.getData();
if (this$data == null) {
if (other$data == null) {
return true;
}
} else if (this$data.equals(other$data)) {
return true;
}
return false;
}
}
}
protected boolean canEqual(final Object other) {
return other instanceof ResponseData;
}
@Override
public int hashCode() {
int result1 = 1;
Object $success = this.getSuccess();
int result = result1 * 59 + ($success == null ? 43 : $success.hashCode());
Object $code = this.getCode();
result = result * 59 + ($code == null ? 43 : $code.hashCode());
Object $message = this.getMessage();
result = result * 59 + ($message == null ? 43 : $message.hashCode());
Object $localizedMsg = this.getLocalizedMsg();
result = result * 59 + ($localizedMsg == null ? 43 : $localizedMsg.hashCode());
Object $data = this.getData();
result = result * 59 + ($data == null ? 43 : $data.hashCode());
return result;
}
@Override
public String toString() {
return "ResponseData(success=" + this.getSuccess() + ", code=" + this.getCode() + ", message=" + this.getMessage() + ", localizedMsg=" + this.getLocalizedMsg() + ", data=" + this.getData() + ")";
}
}

View File

@ -0,0 +1,28 @@
package com.hyc.iotdbdemo.response;
/**
* description:
* date: 2022/8/15 21:40
* author: zhouhong
*/
public class SuccessResponseData extends ResponseData {
public SuccessResponseData() {
super(true, DEFAULT_SUCCESS_CODE, "请求成功", "请求成功", (Object)null);
}
public SuccessResponseData(Object object) {
super(true, DEFAULT_SUCCESS_CODE, "请求成功", "请求成功", object);
}
public SuccessResponseData(Integer code, String message, Object object) {
super(true, code, message, message, object);
}
public SuccessResponseData(Integer code, String message, String localizedMsg, Object object) {
super(true, code, message, localizedMsg, object);
}
public SuccessResponseData(Integer code, String message) {
super(true, code, message);
}
}

View File

@ -0,0 +1,26 @@
package com.hyc.iotdbdemo.server;
import com.hyc.domain.VehicleData;
import com.hyc.iotdbdemo.model.param.IotDbParam;
import org.apache.iotdb.rpc.IoTDBConnectionException;
import org.apache.iotdb.rpc.StatementExecutionException;
import java.rmi.ServerException;
/**
* description: iot
* date: 2022/8/15 21:41
* author: zhouhong
*/
public interface IotDbServer {
/**
*
*/
void insertData(VehicleData vehicleData) throws StatementExecutionException, ServerException, IoTDBConnectionException;
/**
*
*/
Object queryDataFromIotDb(IotDbParam vehicleData) throws Exception;
}

View File

@ -0,0 +1,251 @@
package com.hyc.iotdbdemo.server.impl;
import com.hyc.domain.VehicleData;
import com.hyc.iotdbdemo.config.IotDBSessionConfig;
import com.hyc.iotdbdemo.model.param.IotDbParam;
import com.hyc.iotdbdemo.model.result.IotDbResult;
import com.hyc.iotdbdemo.server.IotDbServer;
import lombok.extern.log4j.Log4j2;
import org.apache.iotdb.rpc.IoTDBConnectionException;
import org.apache.iotdb.rpc.StatementExecutionException;
import org.apache.iotdb.session.SessionDataSet;
import org.apache.iotdb.tsfile.read.common.Field;
import org.apache.iotdb.tsfile.read.common.RowRecord;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
import java.rmi.ServerException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* description: iot
* date: 2022/8/15 9:43
* author: zhouhong
*/
@Log4j2
@Service
public class IotDbServerImpl implements IotDbServer {
@Resource
private IotDBSessionConfig iotDBSessionConfig;
@Override
public void insertData(VehicleData vehicleData) throws StatementExecutionException, ServerException, IoTDBConnectionException {
// iotDbParam: 模拟设备上报消息
// bizkey: 业务唯一key PK :产品唯一编码 SN:设备唯一编码
String deviceId = "root.bizkey."+vehicleData.getVin();
// 将设备上报的数据存入数据库(时序数据库)
List<String> measurementsList = getInsertList();
List<String> valuesList = getInsertValueList(vehicleData);
iotDBSessionConfig.insertRecord(deviceId, vehicleData.getStartTime(), measurementsList, valuesList);
}
@Override
public List<VehicleData> queryDataFromIotDb(IotDbParam vehicleData) throws Exception {
List<VehicleData> iotDbResultList = new ArrayList<>();
log.error("查询数据的参数是:{}",vehicleData);
if (null != vehicleData.getVin()) {
String sql = "select * from root.bizkey."+ vehicleData.getVin() + " where time >= "
+ vehicleData.getStartTime() + " and time < " + vehicleData.getEndTime();
SessionDataSet sessionDataSet = iotDBSessionConfig.query(sql);
List<String> columnNames = sessionDataSet.getColumnNames();
List<String> titleList = new ArrayList<>();
// 排除Time字段 -- 方便后面后面拼装数据
for (int i = 1; i < columnNames.size(); i++) {
String[] temp = columnNames.get(i).split("\\.");
titleList.add(temp[temp.length - 1]);
}
// 封装处理数据
packagingData(vehicleData, iotDbResultList, sessionDataSet, titleList);
} else {
log.info("PK或者SN不能为空");
}
return iotDbResultList;
}
/**
*
* @param iotDbParam
* @param iotDbResultList
* @param sessionDataSet
* @param titleList
* @throws StatementExecutionException
* @throws IoTDBConnectionException
*/
private void packagingData(IotDbParam iotDbParam, List<VehicleData> iotDbResultList, SessionDataSet sessionDataSet, List<String> titleList)
throws StatementExecutionException, IoTDBConnectionException {
int fetchSize = sessionDataSet.getFetchSize();
if (fetchSize > 0) {
while (sessionDataSet.hasNext()) {
VehicleData iotDbResult = new VehicleData();
RowRecord next = sessionDataSet.next();
List<Field> fields = next.getFields();
long timestamp = next.getTimestamp();
// new java.text.SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(next.getTimestamp());
iotDbResult.setStartTime(timestamp);
Map<String, String> map = new HashMap<>();
for (int i = 0; i < fields.size(); i++) {
Field field = fields.get(i);
// 这里的需要按照类型获取
map.put(titleList.get(i), field.getObjectValue(field.getDataType()).toString());
}
iotDbResult.setStartTime(timestamp);
iotDbResult.setVin(map.get("vin"));
iotDbResult.setLongitude(Double.valueOf(map.get("longitude")));
iotDbResult.setLatitude(Double.valueOf(map.get("latitude")));
iotDbResult.setSpeed(Double.valueOf(map.get("speed")));
iotDbResult.setMileage(Double.valueOf(map.get("mileage")));
iotDbResult.setVoltage(Double.valueOf(map.get("voltage")));
iotDbResult.setCurrent(Double.valueOf(map.get("current")));
iotDbResult.setResistance(Double.valueOf(map.get("resistance")));
iotDbResult.setGear(map.get("gear"));
iotDbResult.setAccelerationPedal(Double.valueOf(map.get("accelerationPedal")));
iotDbResult.setBrakePedal(Double.valueOf(map.get("brakePedal")));
iotDbResult.setFuelConsumptionRate(Double.valueOf(map.get("fuelConsumptionRate")));
iotDbResult.setMotorControllerTemperature(Double.valueOf(map.get("motorControllerTemperature")));
iotDbResult.setMotorSpeed(Double.valueOf(map.get("motorSpeed")));
iotDbResult.setMotorTorque(Double.valueOf(map.get("motorTorque")));
iotDbResult.setMotorTemperature(Double.valueOf(map.get("motorTemperature")));
iotDbResult.setMotorVoltage(Double.valueOf(map.get("motorVoltage")));
iotDbResult.setMotorCurrent(Double.valueOf(map.get("motorCurrent")));
iotDbResult.setRemainingBattery(Double.valueOf(map.get("remainingBattery")));
iotDbResult.setMaximumFeedbackPower(Double.valueOf(map.get("maximumFeedbackPower")));
iotDbResult.setMaximumDischargePower(Double.valueOf(map.get("maximumDischargePower")));
iotDbResult.setSelfCheckCounter(Double.valueOf(map.get("selfCheckCounter")));
iotDbResult.setTotalBatteryCurrent(Double.valueOf(map.get("totalBatteryCurrent")));
iotDbResult.setTotalBatteryVoltage(Double.valueOf(map.get("totalBatteryVoltage")));
iotDbResult.setSingleBatteryMaxVoltage(Double.valueOf(map.get("singleBatteryMaxVoltage")));
iotDbResult.setSingleBatteryMinVoltage(Double.valueOf(map.get("singleBatteryMinVoltage")));
iotDbResult.setSingleBatteryMaxTemperature(Double.valueOf(map.get("singleBatteryMaxTemperature")));
iotDbResult.setSingleBatteryMinTemperature(Double.valueOf(map.get("singleBatteryMinTemperature")));
iotDbResult.setAvailableBatteryCapacity(Double.valueOf(map.get("availableBatteryCapacity")));
iotDbResult.setVehicleStatus(Double.valueOf(map.get("vehicleStatus")));
iotDbResult.setChargingStatus(Double.valueOf(map.get("chargingStatus")));
iotDbResult.setOperatingStatus(Double.valueOf(map.get("operatingStatus")));
iotDbResult.setSocStatus(Double.valueOf(map.get("socStatus")));
iotDbResult.setChargingEnergyStorageStatus(Double.valueOf(map.get("chargingEnergyStorageStatus")));
iotDbResult.setDriveMotorStatus(Double.valueOf(map.get("driveMotorStatus")));
iotDbResult.setPositionStatus(Double.valueOf(map.get("positionStatus")));
iotDbResult.setEasStatus(Double.valueOf(map.get("easStatus")));
iotDbResult.setPtcStatus(Double.valueOf(map.get("ptcStatus")));
iotDbResult.setEpsStatus(Double.valueOf(map.get("epsStatus")));
iotDbResult.setAbsStatus(Double.valueOf(map.get("absStatus")));
iotDbResult.setMcuStatus(Double.valueOf(map.get("mcuStatus")));
iotDbResult.setHeatingStatus(Double.valueOf(map.get("heatingStatus")));
iotDbResult.setBatteryStatus(Double.valueOf(map.get("batteryStatus")));
iotDbResult.setBatteryInsulationStatus(Double.valueOf(map.get("batteryInsulationStatus")));
iotDbResult.setDcdcStatus(Double.valueOf(map.get("dcdcStatus")));
iotDbResult.setChgStatus(Double.valueOf(map.get("chgStatus")));
iotDbResultList.add(iotDbResult);
}
}
}
public List<String> getInsertList(){
List<String> measurementsList = new ArrayList<>();
measurementsList.add("startTime");
measurementsList.add("longitude");
measurementsList.add("latitude");
measurementsList.add("speed");
measurementsList.add("mileage");
measurementsList.add("voltage");
measurementsList.add("current");
measurementsList.add("resistance");
measurementsList.add("gear");
measurementsList.add("accelerationPedal");
measurementsList.add("brakePedal");
measurementsList.add("fuelConsumptionRate");
measurementsList.add("motorControllerTemperature");
measurementsList.add("motorSpeed");
measurementsList.add("motorTorque");
measurementsList.add("motorTemperature");
measurementsList.add("motorVoltage");
measurementsList.add("motorCurrent");
measurementsList.add("remainingBattery");
measurementsList.add("maximumFeedbackPower");
measurementsList.add("maximumDischargePower");
measurementsList.add("selfCheckCounter");
measurementsList.add("totalBatteryCurrent");
measurementsList.add("totalBatteryVoltage");
measurementsList.add("singleBatteryMaxVoltage");
measurementsList.add("singleBatteryMinVoltage");
measurementsList.add("singleBatteryMaxTemperature");
measurementsList.add("singleBatteryMinTemperature");
measurementsList.add("availableBatteryCapacity");
measurementsList.add("vehicleStatus");
measurementsList.add("chargingStatus");
measurementsList.add("operatingStatus");
measurementsList.add("socStatus");
measurementsList.add("chargingEnergyStorageStatus");
measurementsList.add("driveMotorStatus");
measurementsList.add("positionStatus");
measurementsList.add("easStatus");
measurementsList.add("ptcStatus");
measurementsList.add("epsStatus");
measurementsList.add("absStatus");
measurementsList.add("mcuStatus");
measurementsList.add("heatingStatus");
measurementsList.add("batteryStatus");
measurementsList.add("batteryInsulationStatus");
measurementsList.add("dcdcStatus");
measurementsList.add("chgStatus");
return measurementsList;
}
public List<String> getInsertValueList(VehicleData vehicleData){
List<String> valuesList = new ArrayList<>();
valuesList.add(String.valueOf(vehicleData.getStartTime()));
valuesList.add(String.valueOf(vehicleData.getLongitude()));
valuesList.add(String.valueOf(vehicleData.getLatitude()));
valuesList.add(String.valueOf(vehicleData.getSpeed()));
valuesList.add(String.valueOf(vehicleData.getMileage()));
valuesList.add(String.valueOf(vehicleData.getVoltage()));
valuesList.add(String.valueOf(vehicleData.getCurrent()));
valuesList.add(String.valueOf(vehicleData.getResistance()));
valuesList.add(String.valueOf(vehicleData.getGear()));
valuesList.add(String.valueOf(vehicleData.getAccelerationPedal()));
valuesList.add(String.valueOf(vehicleData.getBrakePedal()));
valuesList.add(String.valueOf(vehicleData.getFuelConsumptionRate()));
valuesList.add(String.valueOf(vehicleData.getMotorControllerTemperature()));
valuesList.add(String.valueOf(vehicleData.getMotorSpeed()));
valuesList.add(String.valueOf(vehicleData.getMotorTorque()));
valuesList.add(String.valueOf(vehicleData.getMotorTemperature()));
valuesList.add(String.valueOf(vehicleData.getMotorVoltage()));
valuesList.add(String.valueOf(vehicleData.getMotorCurrent()));
valuesList.add(String.valueOf(vehicleData.getRemainingBattery()));
valuesList.add(String.valueOf(vehicleData.getMaximumFeedbackPower()));
valuesList.add(String.valueOf(vehicleData.getMaximumDischargePower()));
valuesList.add(String.valueOf(vehicleData.getSelfCheckCounter()));
valuesList.add(String.valueOf(vehicleData.getTotalBatteryCurrent()));
valuesList.add(String.valueOf(vehicleData.getTotalBatteryVoltage()));
valuesList.add(String.valueOf(vehicleData.getSingleBatteryMaxVoltage()));
valuesList.add(String.valueOf(vehicleData.getSingleBatteryMinVoltage()));
valuesList.add(String.valueOf(vehicleData.getSingleBatteryMaxTemperature()));
valuesList.add(String.valueOf(vehicleData.getSingleBatteryMinTemperature()));
valuesList.add(String.valueOf(vehicleData.getAvailableBatteryCapacity()));
valuesList.add(String.valueOf(vehicleData.getVehicleStatus()));
valuesList.add(String.valueOf(vehicleData.getChargingStatus()));
valuesList.add(String.valueOf(vehicleData.getOperatingStatus()));
valuesList.add(String.valueOf(vehicleData.getSocStatus()));
valuesList.add(String.valueOf(vehicleData.getChargingEnergyStorageStatus()));
valuesList.add(String.valueOf(vehicleData.getDriveMotorStatus()));
valuesList.add(String.valueOf(vehicleData.getPositionStatus()));
valuesList.add(String.valueOf(vehicleData.getEasStatus()));
valuesList.add(String.valueOf(vehicleData.getPtcStatus()));
valuesList.add(String.valueOf(vehicleData.getEpsStatus()));
valuesList.add(String.valueOf(vehicleData.getAbsStatus()));
valuesList.add(String.valueOf(vehicleData.getMcuStatus()));
valuesList.add(String.valueOf(vehicleData.getHeatingStatus()));
valuesList.add(String.valueOf(vehicleData.getBatteryStatus()));
valuesList.add(String.valueOf(vehicleData.getBatteryInsulationStatus()));
valuesList.add(String.valueOf(vehicleData.getDcdcStatus()));
valuesList.add(String.valueOf(vehicleData.getChgStatus()));
return valuesList;
}
}

View File

@ -1,11 +1,28 @@
package com.hyc.kafka.demo.consumer; package com.hyc.kafka.demo.consumer;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.hyc.domain.VehicleData;
import com.hyc.iotdbdemo.config.IotDBSessionConfig;
import com.hyc.iotdbdemo.server.IotDbServer;
import com.hyc.kafka.demo.strategy.Strategy;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.apache.iotdb.rpc.IoTDBConnectionException;
import org.apache.iotdb.rpc.StatementExecutionException;
import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.kafka.annotation.KafkaListener; import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment; import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import javax.annotation.Resource;
import java.rmi.ServerException;
import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Random;
/** /**
* kafka * kafka
* *
@ -17,15 +34,59 @@ import org.springframework.stereotype.Service;
@Slf4j @Slf4j
@Service @Service
public class KafkaConsumer { public class KafkaConsumer {
@KafkaListener(topics = "topic1", groupId = "firstGroup", containerFactory = "kafkaListenerContainerFactory",
@Autowired
private RedisTemplate<String,String> redisTemplate;
@Resource
private IotDbServer iotDbServer;
@Resource
private IotDBSessionConfig iotDBSessionConfig;
@KafkaListener(topics = "topichyc", groupId = "firstGroup", containerFactory = "kafkaListenerContainerFactory",
errorHandler = "myKafkaListenerErrorHandler") errorHandler = "myKafkaListenerErrorHandler")
public void consume(ConsumerRecord<Object,Object> consumerRecord, Acknowledgment acknowledgment) { public void consume(ConsumerRecord<Object,Object> consumerRecord, Acknowledgment acknowledgment) {
try { try {
Object value = consumerRecord.value(); //策略map集合
log.error("消费者得到的数据:{},所在分区:{}",value,consumerRecord.partition()); LinkedHashMap<String, Strategy> stringStrategyLinkedHashMap = new LinkedHashMap<>();
}finally { stringStrategyLinkedHashMap.put("存储数据", Strategy.STORE_DATA);
stringStrategyLinkedHashMap.put("实时数据",Strategy.REAL_TIME_DATA);
//解析得到VIN
String value = (String) consumerRecord.value();
VehicleData vehicleData = JSONObject.parseObject(value, VehicleData.class);
log.error("消费者0得到的数据:{},所在分区:{}",vehicleData.toString(),consumerRecord.partition());
iotDbServer.insertData(vehicleData);
// String dataMessage = value.toString();
// JSONObject jsonObject = JSON.parseObject(dataMessage);
// //根据VIN得到该小车拥有的事件
// String vin = jsonObject.get("vin").toString();
// List<String> eventList = getEvent(vin);
// //循环事件集合,并执行响应的事件
// for (String event : eventList) {
// stringStrategyLinkedHashMap.get(event).exe(redisTemplate,dataMessage);
// }
} catch (ServerException e) {
log.error("添加iotDb异常");
throw new RuntimeException(e);
} catch (IoTDBConnectionException e) {
throw new RuntimeException(e);
} catch (StatementExecutionException e) {
throw new RuntimeException(e);
} finally {
acknowledgment.acknowledge(); acknowledgment.acknowledge();
} }
}
public List<String> getEvent(String vin){
ArrayList<String> strings = new ArrayList<>();
strings.add("存储数据");
int nextInt = new Random().nextInt(100);
if (nextInt % 2 ==0){
strings.add("实时数据");
}
return strings;
} }
} }

View File

@ -0,0 +1,30 @@
package com.hyc.kafka.demo.consumer;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Service;
/**
* kafka
*
* @author YouChe·He
* @ClassName: KafkaConsumer
* @Description: kafka
* @CreateTime: 2024/6/6 15:33
*/
@Slf4j
@Service
public class KafkaConsumer1 {
@KafkaListener(topics = "topichyc", groupId = "firstGroup", containerFactory = "kafkaListenerContainerFactory",
errorHandler = "myKafkaListenerErrorHandler")
public void consume(ConsumerRecord<Object,Object> consumerRecord, Acknowledgment acknowledgment) {
try {
Object value = consumerRecord.value();
log.error("消费者1得到的数据:{},所在分区:{}",value,consumerRecord.partition());
}finally {
acknowledgment.acknowledge();
}
}
}

View File

@ -33,8 +33,7 @@ public class KafkaController {
@Transactional @Transactional
public void sendMessage(String message) { public void sendMessage(String message) {
System.out.println("呼呼呼"); System.out.println("呼呼呼");
kafkaTemplate.send("topichyc", UUID.randomUUID().toString(), message);
kafkaTemplate.send("topic1", UUID.randomUUID().toString(), message);
} }
} }

View File

@ -0,0 +1,21 @@
package com.hyc.kafka.demo.event;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
/**
*
*
* @author YouChe·He
* @ClassName: EventPosting
* @Description:
* @CreateTime: 2024/6/11 08:44
*/
@Component
public class EventPosting {
@Autowired
private RedisTemplate<String,String> redisTemplate;
}

View File

@ -0,0 +1,37 @@
package com.hyc.kafka.demo.strategy;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.core.RedisTemplate;
/**
*
*
* @author YouChe·He
* @ClassName: StrategyA
* @Description: A
* @CreateTime: 2024/6/4 14:13
*/
@Slf4j
public enum Strategy {
REAL_TIME_DATA{
@Override
public void exe(RedisTemplate<String,String> redisTemplate, String realData){
JSONObject jsonObject = JSON.parseObject(realData);
String vin = jsonObject.get("vin").toString();
log.error("vin是:{}",vin);
if (redisTemplate.hasKey(vin)){
redisTemplate.opsForList().rightPush(vin,realData);
}
}
},
STORE_DATA{
@Override
public void exe(RedisTemplate<String,String> redisTemplate, String realData){
System.out.println("执行具体策略B");
}
};
public abstract void exe(RedisTemplate<String,String> redisTemplate, String realData);
}

View File

@ -5,7 +5,7 @@ server:
spring: spring:
rabbitmq: rabbitmq:
host: 115.159.47.91 host: 47.103.75.98
port: 5672 port: 5672
username: guest username: guest
password: guest password: guest
@ -51,7 +51,7 @@ spring:
kafka: kafka:
producer: producer:
# Kafka服务器 # Kafka服务器
bootstrap-servers: 115.159.211.196:9092 bootstrap-servers: 121.43.127.44:9092
# 开启事务,必须在开启了事务的方法中发送,否则报错 # 开启事务,必须在开启了事务的方法中发送,否则报错
transaction-id-prefix: kafkaTx- transaction-id-prefix: kafkaTx-
# 发生错误后消息重发的次数开启事务必须设置大于0。 # 发生错误后消息重发的次数开启事务必须设置大于0。
@ -72,10 +72,10 @@ spring:
consumer: consumer:
# Kafka服务器 # Kafka服务器
bootstrap-servers: 115.159.211.196:9092 bootstrap-servers: 121.43.127.44:9092
group-id: firstGroup group-id: firstGroup
# 自动提交的时间间隔 在spring boot 2.X 版本中这里采用的是值的类型为Duration 需要符合特定的格式如1S,1M,2H,5D # 自动提交的时间间隔 在spring boot 2.X 版本中这里采用的是值的类型为Duration 需要符合特定的格式如1S,1M,2H,5D
#auto-commit-interval: 2s auto-commit-interval: 2s
# 该属性指定了消费者在读取一个没有偏移量的分区或者偏移量无效的情况下该作何处理: # 该属性指定了消费者在读取一个没有偏移量的分区或者偏移量无效的情况下该作何处理:
# earliest当各分区下有已提交的offset时从提交的offset开始消费无提交的offset时从头开始消费分区的记录 # earliest当各分区下有已提交的offset时从提交的offset开始消费无提交的offset时从头开始消费分区的记录
# latest当各分区下有已提交的offset时从提交的offset开始消费无提交的offset时消费新产生的该分区下的数据在消费者启动之后生成的记录 # latest当各分区下有已提交的offset时从提交的offset开始消费无提交的offset时消费新产生的该分区下的数据在消费者启动之后生成的记录