fix 完善报文解析

master
rouchen 2024-06-28 22:22:51 +08:00
parent 7ce022076d
commit c16028c96f
20 changed files with 371 additions and 64299 deletions

File diff suppressed because it is too large Load Diff

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

View File

@ -1,198 +0,0 @@
package com.muyu.iotDB.config;
import lombok.extern.log4j.Log4j2;
import org.apache.iotdb.rpc.IoTDBConnectionException;
import org.apache.iotdb.rpc.StatementExecutionException;
import org.apache.iotdb.session.Session;
import org.apache.iotdb.session.SessionDataSet;
import org.apache.iotdb.session.util.Version;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.write.record.Tablet;
import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.stereotype.Component;
import java.rmi.ServerException;
import java.util.ArrayList;
import java.util.List;
/**
* IoTDB IotDBSessionConfig
*
* @author Yangle
* Date 2024/6/16 10:37
*/
@Log4j2
@Component
@Configuration
public class IotDBSessionConfig {
private static Session session;
private static final String LOCAL_HOST = "127.0.0.1";
@Bean
public Session getSession() throws IoTDBConnectionException, StatementExecutionException {
if (session == null) {
log.info("正在连接iotdb.......");
session = new Session.Builder().host(LOCAL_HOST).port(6667).username("root").password("root").version(Version.V_0_13).build();
session.open(false);
session.setFetchSize(100);
log.info("iotdb连接成功~");
// 设置时区
session.setTimeZone("+08:00");
}
return session;
}
/**
* description: - insertRecord
* author: zhouhong
*
* @param * @param deviceId:root.a1eaKSRpRty.CA3013A303A25467
* time:
* measurementsList
* type BOOLEAN((byte)0), INT32((byte)1),INT64((byte)2),FLOAT((byte)3),DOUBLE((byte)4),TEXT((byte)5),VECTOR((byte)6);
* valuesList ---
* @return
*/
public void insertRecordType(String deviceId, Long time, List<String> measurementsList, TSDataType type, List<Object> valuesList) throws StatementExecutionException, IoTDBConnectionException, ServerException {
if (measurementsList.size() != valuesList.size()) {
throw new ServerException("measurementsList 与 valuesList 值不对应");
}
List<TSDataType> types = new ArrayList<>();
measurementsList.forEach(item -> {
types.add(type);
});
session.insertRecord(deviceId, time, measurementsList, types, valuesList);
}
/**
* description: - insertRecord
* author: zhouhong
*
* @param deviceId:root.a1eaKSRpRty.CA3013A303A25467
* @param time:
* @param measurementsList
* @param valuesList ---
* @return
*/
public void insertRecord(String deviceId, Long time, List<String> measurementsList, List<String> valuesList) throws StatementExecutionException, IoTDBConnectionException, ServerException {
if (measurementsList.size() == valuesList.size()) {
session.insertRecord(deviceId, time, measurementsList, valuesList);
} else {
log.error("measurementsList 与 valuesList 值不对应");
}
}
/**
* description:
* author: zhouhong
*/
public void insertRecords(List<String> deviceIdList, List<Long> timeList, List<List<String>> measurementsList, List<List<String>> valuesList) throws StatementExecutionException, IoTDBConnectionException, ServerException {
if (measurementsList.size() == valuesList.size()) {
session.insertRecords(deviceIdList, timeList, measurementsList, valuesList);
} else {
log.error("measurementsList 与 valuesList 值不对应");
}
}
/**
* description:
* author: zhouhong
*
* @param deviceId:root.a1eaKSRpRty.CA3013A303A25467
* @param time:
* @param schemaList: + List<MeasurementSchema> schemaList = new ArrayList<>(); schemaList.add(new MeasurementSchema("breath", TSDataType.INT64));
* @param maxRowNumber
* @return
*/
public void insertTablet(String deviceId, Long time, List<MeasurementSchema> schemaList, List<Object> valueList, int maxRowNumber) throws StatementExecutionException, IoTDBConnectionException {
Tablet tablet = new Tablet(deviceId, schemaList, maxRowNumber);
// 向iotdb里面添加数据
int rowIndex = tablet.rowSize++;
tablet.addTimestamp(rowIndex, time);
for (int i = 0; i < valueList.size(); i++) {
tablet.addValue(schemaList.get(i).getMeasurementId(), rowIndex, valueList.get(i));
}
if (tablet.rowSize == tablet.getMaxRowNumber()) {
session.insertTablet(tablet, true);
tablet.reset();
}
if (tablet.rowSize != 0) {
session.insertTablet(tablet);
tablet.reset();
}
}
/**
* description: SQL
* author: zhouhong
*/
public SessionDataSet query(String sql) throws StatementExecutionException, IoTDBConnectionException {
return session.executeQueryStatement(sql);
}
/**
* description: root.a1eaKSRpRty
* author: zhouhong
*
* @param groupName
* @return
*/
public void deleteStorageGroup(String groupName) throws StatementExecutionException, IoTDBConnectionException {
session.deleteStorageGroup(groupName);
}
/**
* description: Timeseries root.a1eaKSRpRty.CA3013A303A25467.breath
* author: zhouhong
*/
public void deleteTimeseries(String timeseries) throws StatementExecutionException, IoTDBConnectionException {
session.deleteTimeseries(timeseries);
}
/**
* description: Timeseries
* author: zhouhong
*/
public void deleteTimeserieList(List<String> timeseriesList) throws StatementExecutionException, IoTDBConnectionException {
session.deleteTimeseries(timeseriesList);
}
/**
* description:
* author: zhouhong
*/
public void deleteStorageGroupList(List<String> storageGroupList) throws StatementExecutionException, IoTDBConnectionException {
session.deleteStorageGroups(storageGroupList);
}
/**
* description:
* author: zhouhong
*/
public void deleteDataByPathAndEndTime(String path, Long endTime) throws StatementExecutionException, IoTDBConnectionException {
session.deleteData(path, endTime);
}
/**
* description:
* author: zhouhong
*/
public void deleteDataByPathListAndEndTime(List<String> pathList, Long endTime) throws StatementExecutionException, IoTDBConnectionException {
session.deleteData(pathList, endTime);
}
/**
* description:
* author: zhouhong
*/
public void deleteDataByPathListAndTime(List<String> pathList, Long startTime, Long endTime) throws StatementExecutionException, IoTDBConnectionException {
session.deleteData(pathList, startTime, endTime);
}
}

View File

@ -1,77 +0,0 @@
package com.muyu.iotDB.controller;
import com.muyu.iotDB.config.IotDBSessionConfig;
import com.muyu.iotDB.data.IotDbParam;
import com.muyu.iotDB.data.ResponseData;
import com.muyu.iotDB.service.IotDbServer;
import com.muyu.mqtt.dao.MessageData;
import lombok.extern.log4j.Log4j2;
import org.apache.iotdb.rpc.IoTDBConnectionException;
import org.apache.iotdb.rpc.StatementExecutionException;
import org.springframework.web.bind.annotation.*;
import javax.annotation.Resource;
import java.rmi.ServerException;
/**
* description: iotdb
* date: 2022/8/15 21:50
* author: zhouhong
*/
@Log4j2
@RestController
public class IotDbController {
@Resource
private IotDbServer iotDbServer;
@Resource
private IotDBSessionConfig iotDBSessionConfig;
/**
*
* @param iotDbParam
*/
@PostMapping("/api/device/insert")
public ResponseData insert(@RequestBody IotDbParam iotDbParam) throws StatementExecutionException, ServerException, IoTDBConnectionException {
iotDbServer.insertData(iotDbParam);
return ResponseData.success();
}
@PostMapping("/api/device/messageData")
public ResponseData messageDataAdd(@RequestBody MessageData messageData) throws ServerException, IoTDBConnectionException, StatementExecutionException {
iotDbServer.add(messageData);
return ResponseData.success();
}
/**
*
*
*/
@PostMapping("/api/device/selectQueryData")
public ResponseData selectQueryData(@RequestParam String vin) throws Exception {
return ResponseData.success(iotDbServer.selectQueryData(vin));
}
/**
*
* @param iotDbParam
*/
@PostMapping("/api/device/queryData")
public ResponseData queryDataFromIotDb(@RequestBody IotDbParam iotDbParam) throws Exception {
return ResponseData.success(iotDbServer.queryDataFromIotDb(iotDbParam));
}
/**
*
* @return
*/
@PostMapping("/api/device/deleteGroup")
public ResponseData deleteGroup() throws StatementExecutionException, IoTDBConnectionException {
iotDBSessionConfig.deleteStorageGroup("root.a1eaKSRpRty");
iotDBSessionConfig.deleteStorageGroup("root.smartretirement");
return ResponseData.success();
}
}

View File

@ -1,71 +0,0 @@
package com.muyu.iotDB.data;
public class ErrorResponseData extends ResponseData{
private String exceptionClazz;
ErrorResponseData(String message) {
super(false, DEFAULT_ERROR_CODE, message, message, (Object)null);
}
public ErrorResponseData(Integer code, String message) {
super(false, code, message, message, (Object)null);
}
ErrorResponseData(Integer code, String message, Object object) {
super(false, code, message, object);
}
ErrorResponseData(Integer code, String message, String localizedMsg, Object object) {
super(false, code, message, localizedMsg, object);
}
public boolean equals(final Object o) {
if (o == this) {
return true;
} else if (!(o instanceof ErrorResponseData)) {
return false;
} else {
ErrorResponseData other = (ErrorResponseData)o;
if (!other.canEqual(this)) {
return false;
} else if (!super.equals(o)) {
return false;
} else {
Object this$exceptionClazz = this.getExceptionClazz();
Object other$exceptionClazz = other.getExceptionClazz();
if (this$exceptionClazz == null) {
if (other$exceptionClazz != null) {
return false;
}
} else if (!this$exceptionClazz.equals(other$exceptionClazz)) {
return false;
}
return true;
}
}
}
protected boolean canEqual(final Object other) {
return other instanceof ErrorResponseData;
}
public int hashCode() {
int result = super.hashCode();
Object $exceptionClazz = this.getExceptionClazz();
result = result * 59 + ($exceptionClazz == null ? 43 : $exceptionClazz.hashCode());
return result;
}
public String getExceptionClazz() {
return this.exceptionClazz;
}
public void setExceptionClazz(final String exceptionClazz) {
this.exceptionClazz = exceptionClazz;
}
public String toString() {
return "ErrorResponseData(exceptionClazz=" + this.getExceptionClazz() + ")";
}
}

View File

@ -1,40 +0,0 @@
package com.muyu.iotDB.data;
import lombok.Data;
/**
* description:
* date: 2022/8/15 21:53
* author: zhouhong
*/
@Data
public class IotDbParam {
/***
* PK
*/
private String pk;
/***
*
*/
private String sn;
/***
*
*/
private Long time;
/***
*
*/
private String breath;
/***
*
*/
private String heart;
/***
*
*/
private String startTime;
/***
*
*/
private String endTime;
}

View File

@ -1,33 +0,0 @@
package com.muyu.iotDB.data;
import lombok.Data;
/**
* description:
* date: 2022/8/15 21:56
* author: zhouhong
*/
@Data
public class IotDbResult {
/***
*
*/
private String time;
/***
* PK
*/
private String pk;
/***
*
*/
private String sn;
/***
*
*/
private String breath;
/***
*
*/
private String heart;
}

View File

@ -1,212 +0,0 @@
package com.muyu.iotDB.data;
import lombok.Data;
@Data
public class ResponseData {
public static final String DEFAULT_SUCCESS_MESSAGE = "请求成功";
public static final String DEFAULT_ERROR_MESSAGE = "网络异常";
public static final Integer DEFAULT_SUCCESS_CODE = 200;
public static final Integer DEFAULT_ERROR_CODE = 500;
private Boolean success;
private Integer code;
private String message;
private String localizedMsg;
private Object data;
public ResponseData() {
}
public ResponseData(Boolean success, Integer code, String message, Object data) {
this.success = success;
this.code = code;
this.message = message;
this.data = data;
}
public ResponseData(Boolean success, Integer code, String message, String localizedMsg, Object data) {
this.success = success;
this.code = code;
this.message = message;
this.localizedMsg = localizedMsg;
this.data = data;
}
public ResponseData(Boolean success, Integer code, String message) {
this.success = success;
this.code = code;
this.message = message;
}
public static SuccessResponseData success() {
return new SuccessResponseData();
}
public static SuccessResponseData success(Object object) {
return new SuccessResponseData(object);
}
public static SuccessResponseData success(Integer code, String message, Object object) {
return new SuccessResponseData(code, message, object);
}
public static SuccessResponseData success(Integer code, String message) {
return new SuccessResponseData(code, message);
}
public static SuccessResponseData success(Integer code, String message, String localizedMsg, Object object) {
return new SuccessResponseData(code, message, localizedMsg, object);
}
public static ErrorResponseData error(String message) {
return new ErrorResponseData(message);
}
public static ErrorResponseData error(Integer code, String message) {
return new ErrorResponseData(code, message);
}
public static ErrorResponseData error(Integer code, String message, Object object) {
return new ErrorResponseData(code, message, object);
}
public static ErrorResponseData error(Integer code, String message, String localizedMsg, Object object) {
return new ErrorResponseData(code, message, localizedMsg, object);
}
public Boolean getSuccess() {
return this.success;
}
public Integer getCode() {
return this.code;
}
public String getMessage() {
return this.message;
}
public String getLocalizedMsg() {
return this.localizedMsg;
}
public Object getData() {
return this.data;
}
public void setSuccess(final Boolean success) {
this.success = success;
}
public void setCode(final Integer code) {
this.code = code;
}
public void setMessage(final String message) {
this.message = message;
}
public void setLocalizedMsg(final String localizedMsg) {
this.localizedMsg = localizedMsg;
}
public void setData(final Object data) {
this.data = data;
}
public boolean equals(final Object o) {
if (o == this) {
return true;
} else if (!(o instanceof ResponseData)) {
return false;
} else {
ResponseData other = (ResponseData)o;
if (!other.canEqual(this)) {
return false;
} else {
label71: {
Object this$success = this.getSuccess();
Object other$success = other.getSuccess();
if (this$success == null) {
if (other$success == null) {
break label71;
}
} else if (this$success.equals(other$success)) {
break label71;
}
return false;
}
Object this$code = this.getCode();
Object other$code = other.getCode();
if (this$code == null) {
if (other$code != null) {
return false;
}
} else if (!this$code.equals(other$code)) {
return false;
}
label57: {
Object this$message = this.getMessage();
Object other$message = other.getMessage();
if (this$message == null) {
if (other$message == null) {
break label57;
}
} else if (this$message.equals(other$message)) {
break label57;
}
return false;
}
Object this$localizedMsg = this.getLocalizedMsg();
Object other$localizedMsg = other.getLocalizedMsg();
if (this$localizedMsg == null) {
if (other$localizedMsg != null) {
return false;
}
} else if (!this$localizedMsg.equals(other$localizedMsg)) {
return false;
}
Object this$data = this.getData();
Object other$data = other.getData();
if (this$data == null) {
if (other$data == null) {
return true;
}
} else if (this$data.equals(other$data)) {
return true;
}
return false;
}
}
}
protected boolean canEqual(final Object other) {
return other instanceof ResponseData;
}
public int hashCode() {
int result1 = 1;
Object $success = this.getSuccess();
int result = result1 * 59 + ($success == null ? 43 : $success.hashCode());
Object $code = this.getCode();
result = result * 59 + ($code == null ? 43 : $code.hashCode());
Object $message = this.getMessage();
result = result * 59 + ($message == null ? 43 : $message.hashCode());
Object $localizedMsg = this.getLocalizedMsg();
result = result * 59 + ($localizedMsg == null ? 43 : $localizedMsg.hashCode());
Object $data = this.getData();
result = result * 59 + ($data == null ? 43 : $data.hashCode());
return result;
}
public String toString() {
return "ResponseData(success=" + this.getSuccess() + ", code=" + this.getCode() + ", message=" + this.getMessage() + ", localizedMsg=" + this.getLocalizedMsg() + ", data=" + this.getData() + ")";
}
}

View File

@ -1,23 +0,0 @@
package com.muyu.iotDB.data;
public class SuccessResponseData extends ResponseData{
public SuccessResponseData() {
super(true, DEFAULT_SUCCESS_CODE, "请求成功", "请求成功", (Object)null);
}
public SuccessResponseData(Object object) {
super(true, DEFAULT_SUCCESS_CODE, "请求成功", "请求成功", object);
}
public SuccessResponseData(Integer code, String message, Object object) {
super(true, code, message, message, object);
}
public SuccessResponseData(Integer code, String message, String localizedMsg, Object object) {
super(true, code, message, localizedMsg, object);
}
public SuccessResponseData(Integer code, String message) {
super(true, code, message);
}
}

View File

@ -1,24 +0,0 @@
package com.muyu.iotDB.service;
import com.muyu.iotDB.data.IotDbParam;
import com.muyu.mqtt.dao.MessageData;
import org.apache.iotdb.rpc.IoTDBConnectionException;
import org.apache.iotdb.rpc.StatementExecutionException;
import java.rmi.ServerException;
/**
* IotDbServer
*
* @author Yangle
* Date 2024/6/16 10:41
*/
public interface IotDbServer {
void insertData(IotDbParam iotDbParam) throws ServerException, IoTDBConnectionException, StatementExecutionException;
Object queryDataFromIotDb(IotDbParam iotDbParam) throws Exception;
void add(MessageData messageData) throws ServerException, IoTDBConnectionException, StatementExecutionException;
Object selectQueryData(String vin) throws IoTDBConnectionException, StatementExecutionException;
}

View File

@ -1,243 +0,0 @@
package com.muyu.iotDB.service.impl;
import com.muyu.iotDB.config.IotDBSessionConfig;
import com.muyu.iotDB.data.IotDbParam;
import com.muyu.iotDB.data.IotDbResult;
import com.muyu.iotDB.service.IotDbServer;
import com.muyu.mqtt.dao.MessageData;
import lombok.extern.log4j.Log4j2;
import org.apache.iotdb.rpc.IoTDBConnectionException;
import org.apache.iotdb.rpc.StatementExecutionException;
import org.apache.iotdb.session.SessionDataSet;
import org.apache.iotdb.tsfile.read.common.Field;
import org.apache.iotdb.tsfile.read.common.RowRecord;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
import java.rmi.ServerException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* description: iot
* date: 2022/8/15 9:43
* author: zhouhong
*/
@Log4j2
@Service
public class IotDbServerImpl implements IotDbServer {
@Resource
private IotDBSessionConfig iotDBSessionConfig;
@Override
public void insertData(IotDbParam iotDbParam) throws ServerException, IoTDBConnectionException, StatementExecutionException {
// iotDbParam: 模拟设备上报消息
// bizkey: 业务唯一key PK :产品唯一编码 SN:设备唯一编码
String deviceId = "root.bizkey."+ iotDbParam.getPk() + "." + iotDbParam.getSn();
// 将设备上报的数据存入数据库(时序数据库)
List<String> measurementsList = new ArrayList<>();
measurementsList.add("heart");
measurementsList.add("breath");
List<String> valuesList = new ArrayList<>();
valuesList.add(String.valueOf(iotDbParam.getHeart()));
valuesList.add(String.valueOf(iotDbParam.getBreath()));
iotDBSessionConfig.insertRecord(deviceId, iotDbParam.getTime(), measurementsList, valuesList);
}
@Override
public List<IotDbResult> queryDataFromIotDb(IotDbParam iotDbParam) throws Exception {
List<IotDbResult> iotDbResultList = new ArrayList<>();
if (null != iotDbParam.getPk() && null != iotDbParam.getSn()) {
String sql = "select * from root.bizkey."+ iotDbParam.getPk() +"." + iotDbParam.getSn() + " where time >= "
+ iotDbParam.getStartTime() + " and time < " + iotDbParam.getEndTime();
SessionDataSet sessionDataSet = iotDBSessionConfig.query(sql);
List<String> columnNames = sessionDataSet.getColumnNames();
List<String> titleList = new ArrayList<>();
// 排除Time字段 -- 方便后面后面拼装数据
for (int i = 1; i < columnNames.size(); i++) {
String[] temp = columnNames.get(i).split("\\.");
titleList.add(temp[temp.length - 1]);
}
// 封装处理数据
packagingData(iotDbParam, iotDbResultList, sessionDataSet, titleList);
} else {
log.info("PK或者SN不能为空");
}
return iotDbResultList;
}
@Override
public void add(MessageData messageData) throws ServerException, IoTDBConnectionException, StatementExecutionException {
String deviceId = "root.bizkey."+ messageData.getVin();
// 将设备上报的数据存入数据库(时序数据库)
List<String> measurementsList = new ArrayList<>();
measurementsList.add("vin");
measurementsList.add("getTimestamp");
measurementsList.add("longitude");
measurementsList.add("latitude");
measurementsList.add("speed");
measurementsList.add("mileage");
measurementsList.add("dischargeVoltage");
measurementsList.add("dischargeCurrent");
measurementsList.add("insulationResistance");
measurementsList.add("gear");
measurementsList.add("accelerationPedal");
measurementsList.add("brakePedal");
measurementsList.add("fuelConsumption");
measurementsList.add("motorControllerTemperature");
measurementsList.add("motorSpeed");
measurementsList.add("motorTorque");
measurementsList.add("motorTemperature");
measurementsList.add("motorVoltage");
measurementsList.add("motorCurrent");
measurementsList.add("powerBattery");
measurementsList.add("maxFeedbackPower");
measurementsList.add("maxDischargePower");
measurementsList.add("bmsSelfCheck");
measurementsList.add("powerBatteryCurrent");
measurementsList.add("powerBatteryV3");
measurementsList.add("maxVoltage");
measurementsList.add("minVoltage");
measurementsList.add("maxTemperature");
measurementsList.add("minTemperature");
measurementsList.add("availableCapacity");
measurementsList.add("vehicleStatus");
measurementsList.add("chargeStatus");
measurementsList.add("runStatus");
measurementsList.add("soc");
measurementsList.add("chargeWorkStatus");
measurementsList.add("driveMotorStatus");
measurementsList.add("location");
measurementsList.add("eas");
measurementsList.add("ptc");
measurementsList.add("eps");
measurementsList.add("abs");
measurementsList.add("mcu");
measurementsList.add("powerBatteryHeating");
measurementsList.add("powerBatteryCurrentStatus");
measurementsList.add("powerBatteryHeat");
measurementsList.add("dcdc");
measurementsList.add("chg");
List<String> valuesList = new ArrayList<>();
valuesList.add(messageData.getVin());
valuesList.add(messageData.getTimestamp());
valuesList.add(messageData.getLongitude());
valuesList.add(messageData.getLatitude());
valuesList.add(messageData.getSpeed());
valuesList.add(messageData.getMileage());
valuesList.add(messageData.getDischargeVoltage());
valuesList.add(messageData.getDischargeCurrent());
valuesList.add(messageData.getInsulationResistance());
valuesList.add(messageData.getGear());
valuesList.add(messageData.getAccelerationPedal());
valuesList.add(messageData.getBrakePedal());
valuesList.add(messageData.getFuelConsumption());
valuesList.add(messageData.getMotorControllerTemperature());
valuesList.add(messageData.getMotorSpeed());
valuesList.add(messageData.getMotorTorque());
valuesList.add(messageData.getMotorTemperature());
valuesList.add(messageData.getMotorVoltage());
valuesList.add(messageData.getMotorCurrent());
valuesList.add(messageData.getPowerBattery());
valuesList.add(messageData.getMaxFeedbackPower());
valuesList.add(messageData.getMaxDischargePower());
valuesList.add(messageData.getBmsSelfCheck());
valuesList.add(messageData.getPowerBatteryCurrent());
valuesList.add(messageData.getPowerBatteryV3());
valuesList.add(messageData.getMaxVoltage());
valuesList.add(messageData.getMinVoltage());
valuesList.add(messageData.getMaxTemperature());
valuesList.add(messageData.getMinTemperature());
valuesList.add(messageData.getAvailableCapacity());
valuesList.add(messageData.getVehicleStatus());
valuesList.add(messageData.getChargeStatus());
valuesList.add(messageData.getRunStatus());
valuesList.add(messageData.getSoc());
valuesList.add(messageData.getChargeWorkStatus());
valuesList.add(messageData.getDriveMotorStatus());
valuesList.add(messageData.getLocation());
valuesList.add(messageData.getEas());
valuesList.add(messageData.getPtc());
valuesList.add(messageData.getEps());
valuesList.add(messageData.getAbs());
valuesList.add(messageData.getMcu());
valuesList.add(messageData.getPowerBatteryHeating());
valuesList.add(messageData.getPowerBatteryCurrentStatus());
valuesList.add(messageData.getPowerBatteryHeat());
valuesList.add(messageData.getDcdc());
valuesList.add(messageData.getChg());
iotDBSessionConfig.insertRecord(deviceId, Long.valueOf(messageData.getTimestamp()), measurementsList, valuesList);
}
/**
* vin
* @param vin
* @return
*/
@Override
public List<MessageData> selectQueryData(String vin) throws IoTDBConnectionException, StatementExecutionException {
// List<IotDbResult> iotDbResultList = new ArrayList<>();
//
// if (null !=vin) {
// String sql = "select * from root.bizkey."+ vin ;
// SessionDataSet sessionDataSet = iotDBSessionConfig.query(sql);
// List<String> columnNames = sessionDataSet.getColumnNames();
// List<String> titleList = new ArrayList<>();
// // 排除Time字段 -- 方便后面后面拼装数据
// for (int i = 1; i < columnNames.size(); i++) {
// String[] temp = columnNames.get(i).split("\\.");
// titleList.add(temp[temp.length - 1]);
// }
// // 封装处理数据
// packagingData(iotDbParam, iotDbResultList, sessionDataSet, titleList);
// } else {
// log.info("PK或者SN不能为空");
// }
// return iotDbResultList;
// } }
return null;
}
/**
*
* @param iotDbParam
* @param iotDbResultList
* @param sessionDataSet
* @param titleList
* @throws StatementExecutionException
* @throws IoTDBConnectionException
*/
private void packagingData(IotDbParam iotDbParam, List<IotDbResult> iotDbResultList, SessionDataSet sessionDataSet, List<String> titleList)
throws StatementExecutionException, IoTDBConnectionException {
int fetchSize = sessionDataSet.getFetchSize();
if (fetchSize > 0) {
while (sessionDataSet.hasNext()) {
IotDbResult iotDbResult = new IotDbResult();
RowRecord next = sessionDataSet.next();
List<Field> fields = next.getFields();
String timeString = new java.text.SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(next.getTimestamp());
iotDbResult.setTime(timeString);
Map<String, String> map = new HashMap<>();
for (int i = 0; i < fields.size(); i++) {
Field field = fields.get(i);
// 这里的需要按照类型获取
map.put(titleList.get(i), field.getObjectValue(field.getDataType()).toString());
}
iotDbResult.setTime(timeString);
iotDbResult.setPk(iotDbParam.getPk());
iotDbResult.setSn(iotDbParam.getSn());
iotDbResult.setHeart(map.get("heart"));
iotDbResult.setBreath(map.get("breath"));
iotDbResultList.add(iotDbResult);
}
}
}
}

View File

@ -0,0 +1,83 @@
package com.muyu.iotdb.config;
import lombok.extern.log4j.Log4j2;
import org.apache.iotdb.rpc.IoTDBConnectionException;
import org.apache.iotdb.rpc.StatementExecutionException;
import org.apache.iotdb.session.Session;
import org.apache.iotdb.session.SessionDataSet;
import org.apache.iotdb.session.util.Version;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.write.record.Tablet;
import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.stereotype.Component;
import java.rmi.ServerException;
import java.util.ArrayList;
import java.util.List;
/**
* description: iotdb
* root.a1eaKSRpRty.CA3013A303A25467
* root.a1eaKSRpRty.CA3013A303A25467.heart root.a1eaKSRpRty
* author: YangLe
*/
@Log4j2
@Component
@Configuration
public class IotDBSessionConfig {
private static Session session;
private static final String LOCAL_HOST = "47.93.162.81";
@Bean
public Session getSession() throws IoTDBConnectionException, StatementExecutionException {
if (session == null) {
log.info("正在连接iotdb.......");
session = new Session.Builder().host(LOCAL_HOST).port(6667).username("root").password("root").version(Version.V_0_13).build();
session.open(false);
session.setFetchSize(100);
log.info("iotdb连接成功~");
// 设置时区
session.setTimeZone("+08:00");
}
return session;
}
/**
* description: - insertRecord
* author: YangLe
*
* @param * @param deviceId:root.a1eaKSRpRty.CA3013A303A25467
* time:
* measurementsList
* type BOOLEAN((byte)0), INT32((byte)1),INT64((byte)2),FLOAT((byte)3),DOUBLE((byte)4),TEXT((byte)5),VECTOR((byte)6);
* valuesList ---
* @return
*/
public void insertRecordType(String deviceId, Long time, List<String> measurementsList, TSDataType type, List<Object> valuesList) throws StatementExecutionException, IoTDBConnectionException, ServerException {
List<TSDataType> types = new ArrayList<>();
measurementsList.forEach(item -> {
types.add(type);
});
session.insertRecord(deviceId, time, measurementsList, types, valuesList);
}
/**
* description: - insertRecord
* author: YangLe
*
* @param deviceId:root.a1eaKSRpRty.CA3013A303A25467
* @param time:
* @param measurementsList
* @param valuesList ---
* @return
*/
public void insertRecord(String deviceId, Long time, List<String> measurementsList, List<String> valuesList) throws StatementExecutionException, IoTDBConnectionException, ServerException {
session.insertRecord(deviceId, time, measurementsList, valuesList);
}
}

View File

@ -1,109 +1,110 @@
package com.muyu.kafka;
import com.alibaba.fastjson.JSON;
import com.muyu.iotDB.service.IotDbServer;
import com.muyu.mqtt.dao.MessageData;
import lombok.extern.log4j.Log4j2;
import org.apache.iotdb.rpc.IoTDBConnectionException;
import org.apache.iotdb.rpc.StatementExecutionException;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.CreatePartitionsResult;
import org.apache.kafka.clients.admin.NewPartitions;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
import java.nio.charset.StandardCharsets;
import java.rmi.ServerException;
import java.util.*;
@Log4j2
@Service
public class KafkaPCUtils {
@Resource
private IotDbServer iotDbServer;
@Autowired
private RedisTemplate<String,String> redisTemplate;
public void sendCallbackOneMessage(String topic, MessageData vehicle) {
String vin = vehicle.getVin();
String vehicleString = JSON.toJSONString(vehicle);
log.info("向主题:[{}],发送消息:{}", topic, vehicle);
String bootstrapServers = "127.0.0.1:9092";
// 设置生产者属性
Properties properties = new Properties();
properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
// 创建生产者
KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
// 创建分区
AdminClient adminClient = AdminClient.create(properties);
CreatePartitionsResult partitions = adminClient.createPartitions(new HashMap() {{
put(topic, NewPartitions.increaseTo(8, null));
}});
// 节点集合
List<Node> list = new ArrayList<>();
list.add(new Node(0, "127.0.0.1", 9092));
// 节点分区列表
List<PartitionInfo> partitionInfos = new ArrayList<>();
for (int i = 0; i < 8; i++) {
partitionInfos.add(new PartitionInfo(topic,i,null,null,null));
}
int partition = new CustomPartitioner().partition(
topic,
vin,
vin.getBytes(),
vehicleString,
vehicleString.getBytes(),
new Cluster(
"uqU0Vo8_TiaV2Xp0kzczaA",
list,
partitionInfos,
new HashSet<String>() {},
new HashSet<>())
);
log.info("当前获取分区:[{}]",partition);
// 创建消息 主题 key 分区 值
ProducerRecord<String, String> record = new ProducerRecord<>(topic,partition, vin, vehicleString);
record.headers().add(new RecordHeader("type", "String".getBytes(StandardCharsets.UTF_8)));
// 发送消息 有回执
producer.send(record, (matadata, exception) -> {
if (exception == null) {
// int partition = matadata.partition();
log.info("消息发送成功,topic:[{}],分区为:[{}]", topic, partition);
} else {
log.info("消息发送失败,topic:[{}],异常信息:[{}]", topic, exception.getMessage());
}
});
// 关闭生产者
producer.close();
}
@KafkaListener(topics = {"test1"},groupId = "Topics")
public void onNormalMessage1(ConsumerRecord<String, Object> record) throws ServerException, IoTDBConnectionException, StatementExecutionException {
String value = (String) record.value();
MessageData messageData = JSON.parseObject(value, MessageData.class);
System.out.println("简单消费:" + record.topic() + "-" + record.partition() + "=" +
record.value());
iotDbServer.add(messageData);
}
}
//package com.muyu.kafka;
//
//import com.alibaba.fastjson.JSON;
//import com.muyu.iotdb.server.IotDbServer;
//import com.muyu.mqtt.dao.MessageData;
//import lombok.extern.log4j.Log4j2;
//import org.apache.iotdb.rpc.IoTDBConnectionException;
//import org.apache.iotdb.rpc.StatementExecutionException;
//import org.apache.kafka.clients.admin.AdminClient;
//import org.apache.kafka.clients.admin.CreatePartitionsResult;
//import org.apache.kafka.clients.admin.NewPartitions;
//import org.apache.kafka.clients.consumer.ConsumerRecord;
//import org.apache.kafka.clients.producer.KafkaProducer;
//import org.apache.kafka.clients.producer.ProducerConfig;
//import org.apache.kafka.clients.producer.ProducerRecord;
//import org.apache.kafka.common.Cluster;
//import org.apache.kafka.common.Node;
//import org.apache.kafka.common.PartitionInfo;
//import org.apache.kafka.common.header.internals.RecordHeader;
//import org.apache.kafka.common.serialization.StringSerializer;
//import org.springframework.beans.factory.annotation.Autowired;
//import org.springframework.data.redis.core.RedisTemplate;
//import org.springframework.kafka.annotation.KafkaListener;
//import org.springframework.stereotype.Service;
//
//import javax.annotation.Resource;
//import java.nio.charset.StandardCharsets;
//import java.rmi.ServerException;
//import java.util.*;
//
//@Log4j2
//@Service
//public class KafkaPCUtils {
// @Resource
// private IotDbServer iotDbServer;
//
// @Autowired
// private RedisTemplate<String,String> redisTemplate;
//
// public void sendCallbackOneMessage(String topic, MessageData vehicle) {
// String vin = vehicle.getVin();
// String vehicleString = JSON.toJSONString(vehicle);
// log.info("向主题:[{}],发送消息:{}", topic, vehicle);
// String bootstrapServers = "127.0.0.1:9092";
//
// // 设置生产者属性
// Properties properties = new Properties();
// properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
// properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
// properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
// // 创建生产者
// KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
// // 创建分区
// AdminClient adminClient = AdminClient.create(properties);
// CreatePartitionsResult partitions = adminClient.createPartitions(new HashMap() {{
// put(topic, NewPartitions.increaseTo(8, null));
// }});
// // 节点集合
// List<Node> list = new ArrayList<>();
// list.add(new Node(0, "127.0.0.1", 9092));
// // 节点分区列表
// List<PartitionInfo> partitionInfos = new ArrayList<>();
// for (int i = 0; i < 8; i++) {
// partitionInfos.add(new PartitionInfo(topic,i,null,null,null));
// }
//
// int partition = new CustomPartitioner().partition(
// topic,
// vin,
// vin.getBytes(),
// vehicleString,
// vehicleString.getBytes(),
// new Cluster(
// "uqU0Vo8_TiaV2Xp0kzczaA",
// list,
// partitionInfos,
// new HashSet<String>() {},
// new HashSet<>())
// );
// log.info("当前获取分区:[{}]",partition);
// // 创建消息 主题 key 分区 值
// ProducerRecord<String, String> record = new ProducerRecord<>(topic,partition, vin, vehicleString);
// record.headers().add(new RecordHeader("type", "String".getBytes(StandardCharsets.UTF_8)));
//
// // 发送消息 有回执
// producer.send(record, (matadata, exception) -> {
// if (exception == null) {
//// int partition = matadata.partition();
// log.info("消息发送成功,topic:[{}],分区为:[{}]", topic, partition);
//
// } else {
// log.info("消息发送失败,topic:[{}],异常信息:[{}]", topic, exception.getMessage());
// }
// });
// // 关闭生产者
// producer.close();
// }
//
//
//// @KafkaListener(topics = {"test1"},groupId = "Topics")
//// public void onNormalMessage1(ConsumerRecord<String, Object> record) throws ServerException, IoTDBConnectionException, StatementExecutionException {
//// String value = (String) record.value();
//// log.error("value:{}",value);
//// MessageData messageData = JSON.parseObject(value, MessageData.class);
//// System.out.println("简单消费:" + record.topic() + "-" + record.partition() + "=" +
//// record.value());
//// iotDbServer.add(messageData);
//// }
//
//}

View File

@ -1,7 +1,9 @@
package com.muyu.kafka;
import com.muyu.iotDB.service.IotDbServer;
import com.alibaba.fastjson2.JSON;
import com.alibaba.fastjson2.JSONObject;
import com.muyu.iotdb.server.IotDbServer;
import com.muyu.mqtt.dao.MessageData;
import lombok.extern.log4j.Log4j2;
@ -36,30 +38,29 @@ public class SimpleKafkaConsumer {
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
String value = record.value();
log.info("value:{}", value);
MessageData messageData1 = com.alibaba.fastjson2.JSONObject.parseObject(value, MessageData.class);
log.info("messageData1:{}", messageData1);
iotDbServer.add(messageData1);
JSONObject jsonObject = JSON.parseObject(value);
iotDbServer.insertData(jsonObject);
} catch (Exception e) {
log.error("Error consuming Kafka message", e);
// 处理异常,可能需要重试或其他逻辑
}
}
@KafkaListener(topics = "test1", groupId = "Topics")
public void consume2(ConsumerRecord<String, String> record) {
log.info("开始消费");
try {
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
String value = record.value();
log.info("value:{}", value);
MessageData messageData1 = com.alibaba.fastjson2.JSONObject.parseObject(value, MessageData.class);
log.info("messageData1:{}", messageData1);
iotDbServer.add(messageData1);
} catch (Exception e) {
log.error("Error consuming Kafka message", e);
// 处理异常,可能需要重试或其他逻辑
}
}
// @KafkaListener(topics = "test1", groupId = "Topics")
// public void consume2(ConsumerRecord<String, String> record) {
// log.info("开始消费");
// try {
// System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
// String value = record.value();
// log.info("value:{}", value);
// MessageData messageData1 = com.alibaba.fastjson2.JSONObject.parseObject(value, MessageData.class);
// log.info("messageData1:{}", messageData1);
// iotDbServer.add(messageData1);
// } catch (Exception e) {
// log.error("Error consuming Kafka message", e);
// // 处理异常,可能需要重试或其他逻辑
// }
// }
}

View File

@ -1,12 +1,15 @@
package com.muyu.mqtt;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson2.JSON;
import com.alibaba.fastjson2.JSONObject;
import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.muyu.mqtt.dao.MessageDa;
import com.muyu.utils.ConversionUtil;
import lombok.extern.log4j.Log4j2;
import org.apache.iotdb.rpc.IoTDBConnectionException;
import org.apache.iotdb.rpc.StatementExecutionException;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
@ -19,10 +22,12 @@ import org.springframework.kafka.core.KafkaAdmin;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
import java.rmi.ServerException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import static com.muyu.utils.ConversionUtil.hexStringToString;
/**
* MessageCallbackService
@ -52,25 +57,23 @@ public class MessageCallbackService implements MqttCallback {
private RedisTemplate<String, String> redisTemplate;
@Override
public void messageArrived(String topic, MqttMessage mqttMessage) {
log.info("topic:{}", topic);
log.info("Qos:{}", mqttMessage.getQos());
log.info("message content:{}", new String(mqttMessage.getPayload()));
String s = new String(mqttMessage.getPayload());
// MessageData main = ConversionUtil.main(s);
JSONObject object = messageParsing(s);
log.error("message:{}",object);
// 准备ProducerRecord并发送到Kafka
String kafkaTopic = topic; // 假设MQTT主题与Kafka主题相同
String vin = object.getString("vin");
System.out.println("VIN: " + vin);
String key = vin; // 使用vin作为key如果适用
String value = JSON.toJSONString(object);
ProducerRecord<String, String> producerRecord = new ProducerRecord<>(kafkaTopic, key, value);
kafkaTemplate.send(producerRecord);// 注意这里使用get()会阻塞直到发送完成,实际应用中可能需要异步处理
public void messageArrived(String topic, MqttMessage message) {
// 将消息内容从字节数组转换为字符串
String s = new String(message.getPayload());
try {
String s1 = ConversionUtil.hexStringToString(s);
JSONObject object = getObject(s1);
log.error("object:{}", object);
String vin ="";
String key = vin; // 使用vin作为key如果适用
String value = com.alibaba.fastjson.JSON.toJSONString(object);
log.error("value:{}", value);
ProducerRecord<String, String> producerRecord = new ProducerRecord<>(topic, key, value);
kafkaTemplate.send(producerRecord);
} catch (Exception e) {
e.printStackTrace();
}
}
@Override
@ -78,46 +81,52 @@ public class MessageCallbackService implements MqttCallback {
System.out.println("deliveryComplete---------" + token.isComplete());
}
public com.alibaba.fastjson2.JSONObject getObject(String message) {
log.info("原始数据:{}", message);
// 移除字符串message的首尾字符为后续处理准备
message = message.substring(1, message.length() - 2);
// 初始化StringBuilder用于构建最终的JSON字符串
StringBuffer buffer = new StringBuffer();
// 从Redis中获取指定key的值这里的key是message的前17个字符
Object o = redisTemplate.opsForHash().get("VIN", message.substring(0, 17));
// 将获取到的值转换为VehicleDateModel的列表
List<MessageDa> messageDas = new ArrayList<>();
messageDas = JSON.parseObject(o.toString(), ArrayList.class).stream()
.map(obj -> JSON.parseObject(obj.toString(), MessageDa.class)).toList();
// 用于构建最终JSON字符串的变量
String finalMessage = message;
// 遍历车辆数据模型列表构建JSON字符串的键值对
messageDas.forEach(messageDa -> {
// 根据车辆数据模型的起始和结束位置以及特定的处理逻辑构建JSON字符串的键值对并追加到buffer中
buffer.append(",\"" + messageDa.getAnalyzeKey() + "\":" +
"\"" + remove0(finalMessage.substring(messageDa.getAnalyzeStart() - 1, messageDa.getEnt())) + "\"");
});
// 构建最终的JSON字符串
String jsonString = "{" + buffer.substring(1) + "}";
com.alibaba.fastjson2.JSONObject jsonObject = JSON.parseObject(jsonString);
try {
private JSONObject messageParsing(String s) {
// 假设hexStringToString是一个将十六进制字符串转换为普通字符串的方法
String hexStringToString = hexStringToString(s);
// 提取字符串中的特定部分
String substring = hexStringToString.substring(1, hexStringToString.length() - 2);
String vin = substring.substring(0, 17);
} catch (Exception e) {
throw new RuntimeException(e);
}
// 日志记录解析后的数据
log.info("解析后的数据:{}", jsonString);
// 从本地缓存中获取VIN对应的信息
String cachedResult = localCache.getIfPresent(vin);
JSONObject result = null;
if (cachedResult == null) {
// 如果本地缓存中没有找到则从Redis中获取
String redisList = redisTemplate.opsForValue().get(vin);
log.error("redisList:{}", redisList);
JSONArray jsonArray = JSON.parseArray(redisList);
// 返回构建好的JSONObject
return new com.alibaba.fastjson2.JSONObject(jsonObject);
}
// 创建一个空的JSONObject来存储合并后的数据
result = new JSONObject();
// 遍历JSONArray中的每个JSONObject并将其合并到result中
for (int i = 0; i < jsonArray.size(); i++) {
JSONObject item = jsonArray.getJSONObject(i);
String analyzeKey = item.getString("analyzeKey");
int analyzeStart = item.getInteger("analyzeStart");
int ent = item.getInteger("ent");
String extractedValue = substring.substring(analyzeStart, ent);
result.put(analyzeKey, extractedValue);
}
log.error("result:{}", result);
// 将结果转换为JSON字符串然后存储到本地缓存中
localCache.put(vin, result.toJSONString());
// 移除字符串中的前导零
public String remove0(String str) {
if (str.length() > 1) {
if (str.charAt(0) == '0') {
return remove0(str.substring(1));
} else {
// 如果本地缓存中有数据将其解析为JSONObject
result = JSON.parseObject(cachedResult);
return str;
}
return result;
} else {
return str;
}
}
@RabbitListener(queues = "subscription")
@ -127,13 +136,14 @@ public class MessageCallbackService implements MqttCallback {
// 删除本地缓存
localCache.invalidate(vin);
}
/**
*
*/
Cache<String, String> localCache = Caffeine.newBuilder()
.initialCapacity(5)
.maximumSize(10)
.expireAfterWrite(10, TimeUnit.MINUTES)
.build();
}