feat():iotDB查询车辆信息

master
Saisai Liu 2024-06-18 10:13:41 +08:00
parent 45bd7a4897
commit 78d86a2a69
31 changed files with 840 additions and 313 deletions

View File

@ -0,0 +1,14 @@
<component name="InspectionProjectProfileManager">
<profile version="1.0">
<option name="myName" value="Project Default" />
<inspection_tool class="AliAccessStaticViaInstance" enabled="true" level="WARNING" enabled_by_default="true" />
<inspection_tool class="AliArrayNamingShouldHaveBracket" enabled="true" level="WARNING" enabled_by_default="true" />
<inspection_tool class="AliControlFlowStatementWithoutBraces" enabled="true" level="WARNING" enabled_by_default="true" />
<inspection_tool class="AliDeprecation" enabled="true" level="WARNING" enabled_by_default="true" />
<inspection_tool class="AliEqualsAvoidNull" enabled="true" level="WARNING" enabled_by_default="true" />
<inspection_tool class="AliLongLiteralsEndingWithLowercaseL" enabled="true" level="WARNING" enabled_by_default="true" />
<inspection_tool class="AliMissingOverrideAnnotation" enabled="true" level="WARNING" enabled_by_default="true" />
<inspection_tool class="AliWrapperTypeEquality" enabled="true" level="WARNING" enabled_by_default="true" />
<inspection_tool class="MapOrSetKeyShouldOverrideHashCodeEquals" enabled="true" level="WARNING" enabled_by_default="true" />
</profile>
</component>

View File

@ -1,3 +1,4 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="ExternalStorageConfigurationManager" enabled="true" />
<component name="MavenProjectsManager">
@ -6,6 +7,11 @@
<option value="$PROJECT_DIR$/pom.xml" />
</list>
</option>
<option name="ignoredFiles">
<set>
<option value="$PROJECT_DIR$/mobai-event-iotDBDemo/pom.xml" />
</set>
</option>
</component>
<component name="ProjectRootManager" version="2" languageLevel="JDK_17" default="true" project-jdk-name="17" project-jdk-type="JavaSDK">
<output url="file://$PROJECT_DIR$/out" />

View File

@ -18,6 +18,10 @@
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>com.mobai</groupId>

View File

@ -1,27 +0,0 @@
package com.mobai.vehicle.event.client.demo;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;
public class Recv {
private final static String QUEUE_NAME = "hello";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("175.24.138.82");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] Received '" + message + "'");
};
channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });
}
}

View File

@ -1,9 +1,8 @@
server:
port: 10001
kafka:
topic: vehicle-event-topic0
partition: 0
port: 8084
spring:
redis:
host: 127.0.0.1
rabbitmq:
host: 175.24.138.82
stream:

View File

@ -0,0 +1,208 @@
package com.mobai.domain;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.io.Serializable;
import java.math.BigDecimal;
/**
*
* @author Mobai
* @className Vehicle
* @description
* @date 2024/6/6 8:18
*/
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class Vehicle implements Serializable {
/**
* vin
*/
private String vin;
/**
*
*/
private Long startTime;
/**
*
*/
private BigDecimal longitude;
/**
*
*/
private BigDecimal latitude;
/**
*
*/
private BigDecimal speed;
/**
*
*/
private BigDecimal mileage;
/**
*
*/
private BigDecimal voltage;
/**
*
*/
private BigDecimal current;
/**
*
*/
private BigDecimal resistance;
/**
*
*/
private String gear;
/**
*
*/
private BigDecimal accelerationPedal;
/**
*
*/
private BigDecimal brakePedal;
/**
*
*/
private BigDecimal fuelConsumptionRate;
/**
*
*/
private BigDecimal motorControllerTemperature;
/**
*
*/
private BigDecimal motorSpeed;
/**
*
*/
private BigDecimal motoTorque;
/**
*
*/
private BigDecimal motorTemperature;
/**
*
*/
private BigDecimal motorVoltage;
/**
*
*/
private BigDecimal motorCurrent;
/**
* SOC
*/
private BigDecimal remainingBattery;
/**
*
*/
private BigDecimal maximumFeedbackPower;
/**
*
*/
private BigDecimal maximumDischargePower;
/**
* BMS
*/
private BigDecimal selfCheckCounter;
/**
*
*/
private BigDecimal totalBatteryCurrent;
/**
* V3
*/
private BigDecimal totalBatteryVoltage;
/**
*
*/
private BigDecimal singleBatteryMaxVoltage;
/**
*
*/
private BigDecimal singleBatteryMinVoltage;
/**
*
*/
private BigDecimal singleBatteryMaxTemperature;
/**
*
*/
private BigDecimal singleBatteryMinTemperature;
/**
*
*/
private BigDecimal availableBatteryCapacity;
/**
*
*/
private Integer vehicleStatus;
/**
*
*/
private Integer chargingStatus;
/**
*
*/
private Integer operatingStatus;
/**
* SOC
*/
private Integer chargingEnergyStorageStatus;
/**
*
*/
private Integer driveMotorStatus;
/**
*
*/
private Integer positionStatus;
/**
* EAS
*/
private Integer easStatus;
/**
* PTC
*/
private Integer ptcStatus;
/**
* EPS
*/
private Integer epsStatus;
/**
* ABS
*/
private Integer absStatus;
/**
* MCU
*/
private Integer mcuStatus;
/**
*
*/
private Integer heatingStatus;
/**
*
*/
private Integer batteryStatus;
/**
*
*/
private Integer batteryInsulationStatus;
/**
* DCDC
*/
private Integer dcdcStatus;
/**
* CHG
*/
private Integer chgStatus;
}

View File

@ -0,0 +1,34 @@
package com.mobai.req;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
/**
* @author Mobai
* @className VehicleReq
* @description
* @date 2024/6/17 19:24
*/
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class VehicleReq {
/**
*
*/
private String vin;
/**
*
*/
private Long startTime;
/**
*
*/
private Long endTime;
}

View File

@ -0,0 +1,46 @@
vin
startTime
longitude
latitude
speed
mileage
voltage
current
resistance
gear
accelerationPedal
brakePedal
fuelConsumptionRate
motorControllerTemperature
motorSpeed
motoTorque
motorTemperature
motorVoltage
motorCurrent
remainingBattery
maximumFeedbackPower
maximumDischargePower
selfCheckCounter
totalBatteryCurrent
totalBatteryVoltage
singleBatteryMaxVoltage
singleBatteryMinVoltage
singleBatteryMaxTemperature
singleBatteryMinTemperature
availableBatteryCapacity
vehicleStatus
chargingStatus
operatingStatus
chargingEnergyStorageStatus
driveMotorStatus
positionStatus
easStatus
ptcStatus
epsStatus
absStatus
mcuStatus
heatingStatus
batteryStatus
batteryInsulationStatus
dcdcStatus
chgStatus

View File

@ -1,68 +0,0 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>com.mobai</groupId>
<artifactId>event-analysis</artifactId>
<version>1.0.0</version>
</parent>
<artifactId>mobai-event-iotDBDemo</artifactId>
<properties>
<maven.compiler.source>17</maven.compiler.source>
<maven.compiler.target>17</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.iotdb</groupId>
<artifactId>iotdb-session</artifactId>
<version>0.14.0-preview1</version>
</dependency>
<dependency>
<groupId>cn.hutool</groupId>
<artifactId>hutool-all</artifactId>
<version>5.6.3</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.83</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.junit.vintage</groupId>
<artifactId>junit-vintage-engine</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.mobai</groupId>
<artifactId>mobai-event-common</artifactId>
<version>1.0.0</version>
<scope>compile</scope>
</dependency>
</dependencies>
</project>

View File

@ -1,13 +0,0 @@
package com.mobai;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class IotDBApplication {
public static void main(String[] args) {
SpringApplication.run(IotDBApplication.class);
}
}

View File

@ -1,33 +0,0 @@
package com.mobai.domain;
import lombok.Data;
/**
* description:
* date: 2022/8/15 21:56
* author: zhouhong
*/
@Data
public class IotDbResult {
/***
*
*/
private String time;
/***
* PK
*/
private String pk;
/***
*
*/
private String sn;
/***
*
*/
private String breath;
/***
*
*/
private String heart;
}

View File

@ -1,21 +0,0 @@
package com.mobai.service;
import com.mobai.domain.IotDbParam;
import org.apache.iotdb.rpc.IoTDBConnectionException;
import org.apache.iotdb.rpc.StatementExecutionException;
import java.rmi.ServerException;
/**
* @ClassName IotDbServer
* @Description
* @Author Mobai
* @Date 2024/6/17 17:20
*/
public interface IotDbServer {
void insertData(IotDbParam iotDbParam) throws StatementExecutionException, ServerException, IoTDBConnectionException;
Object queryDataFromIotDb(IotDbParam iotDbParam) throws Exception;
}

View File

@ -1,107 +0,0 @@
package com.mobai.service.impl;
import com.mobai.config.IotDBSessionConfig;
import com.mobai.domain.IotDbParam;
import com.mobai.domain.IotDbResult;
import com.mobai.service.IotDbServer;
import lombok.extern.log4j.Log4j2;
import org.apache.iotdb.rpc.IoTDBConnectionException;
import org.apache.iotdb.rpc.StatementExecutionException;
import org.apache.iotdb.session.SessionDataSet;
import org.apache.iotdb.tsfile.read.common.Field;
import org.apache.iotdb.tsfile.read.common.RowRecord;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
import java.rmi.ServerException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* description: iot
* date: 2022/8/15 9:43
* author: zhouhong
*/
@Log4j2
@Service
public class IotDbServerImpl implements IotDbServer {
@Resource
private IotDBSessionConfig iotDBSessionConfig;
@Override
public void insertData(IotDbParam iotDbParam) throws StatementExecutionException, ServerException, IoTDBConnectionException {
// iotDbParam: 模拟设备上报消息
// bizkey: 业务唯一key PK :产品唯一编码 SN:设备唯一编码
String deviceId = "root.bizkey."+ iotDbParam.getPk() + "." + iotDbParam.getSn();
// 将设备上报的数据存入数据库(时序数据库)
List<String> measurementsList = new ArrayList<>();
measurementsList.add("heart");
measurementsList.add("breath");
List<String> valuesList = new ArrayList<>();
valuesList.add(String.valueOf(iotDbParam.getHeart()));
valuesList.add(String.valueOf(iotDbParam.getBreath()));
iotDBSessionConfig.insertRecord(deviceId, iotDbParam.getTime(), measurementsList, valuesList);
}
@Override
public List<IotDbResult> queryDataFromIotDb(IotDbParam iotDbParam) throws Exception {
List<IotDbResult> iotDbResultList = new ArrayList<>();
if (null != iotDbParam.getPk() && null != iotDbParam.getSn()) {
String sql = "select * from root.bizkey."+ iotDbParam.getPk() +"." + iotDbParam.getSn() + " where time >= "
+ iotDbParam.getStartTime() + " and time < " + iotDbParam.getEndTime();
SessionDataSet sessionDataSet = iotDBSessionConfig.query(sql);
List<String> columnNames = sessionDataSet.getColumnNames();
List<String> titleList = new ArrayList<>();
// 排除Time字段 -- 方便后面后面拼装数据
for (int i = 1; i < columnNames.size(); i++) {
String[] temp = columnNames.get(i).split("\\.");
titleList.add(temp[temp.length - 1]);
}
// 封装处理数据
packagingData(iotDbParam, iotDbResultList, sessionDataSet, titleList);
} else {
log.info("PK或者SN不能为空");
}
return iotDbResultList;
}
/**
*
* @param iotDbParam
* @param iotDbResultList
* @param sessionDataSet
* @param titleList
* @throws StatementExecutionException
* @throws IoTDBConnectionException
*/
private void packagingData(IotDbParam iotDbParam, List<IotDbResult> iotDbResultList, SessionDataSet sessionDataSet, List<String> titleList)
throws StatementExecutionException, IoTDBConnectionException {
int fetchSize = sessionDataSet.getFetchSize();
if (fetchSize > 0) {
while (sessionDataSet.hasNext()) {
IotDbResult iotDbResult = new IotDbResult();
RowRecord next = sessionDataSet.next();
List<Field> fields = next.getFields();
String timeString = new java.text.SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(next.getTimestamp());
iotDbResult.setTime(timeString);
Map<String, String> map = new HashMap<>();
for (int i = 0; i < fields.size(); i++) {
Field field = fields.get(i);
// 这里的需要按照类型获取
map.put(titleList.get(i), field.getObjectValue(field.getDataType()).toString());
}
iotDbResult.setTime(timeString);
iotDbResult.setPk(iotDbParam.getPk());
iotDbResult.setSn(iotDbParam.getSn());
iotDbResult.setHeart(map.get("heart"));
iotDbResult.setBreath(map.get("breath"));
iotDbResultList.add(iotDbResult);
}
}
}
}

View File

@ -1,11 +0,0 @@
server:
port: 8085
spring:
redis:
host: 127.0.0.1
rabbitmq:
host: 175.24.138.82
stream:
username: guest
password: guest

View File

@ -18,15 +18,69 @@
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>com.mobai</groupId>
<artifactId>mobai-event-common</artifactId>
<version>1.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.8.11</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.iotdb</groupId>
<artifactId>iotdb-session</artifactId>
<version>0.14.0-preview1</version>
</dependency>
<dependency>
<groupId>cn.hutool</groupId>
<artifactId>hutool-all</artifactId>
<version>5.6.3</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.83</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.junit.vintage</groupId>
<artifactId>junit-vintage-engine</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies>
</project>

View File

@ -1,4 +1,4 @@
package com.mobai.vehicle.event.client;
package com.mobai;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@ -10,8 +10,8 @@ import org.springframework.boot.autoconfigure.SpringBootApplication;
* @date 2024/6/14 20:40
*/
@SpringBootApplication
public class EventClientApplication {
public class EventServiceApplication {
public static void main(String[] args) {
SpringApplication.run(EventClientApplication.class);
SpringApplication.run(EventServiceApplication.class);
}
}

View File

@ -1,4 +1,4 @@
package com.mobai.config;
package com.mobai.iotDB.config;
import lombok.extern.log4j.Log4j2;
import org.apache.iotdb.rpc.IoTDBConnectionException;
@ -34,7 +34,13 @@ public class IotDBSessionConfig {
public Session getSession() throws IoTDBConnectionException, StatementExecutionException {
if (session == null) {
log.info("正在连接iotdb.......");
session = new Session.Builder().host(LOCAL_HOST).port(6667).username("root").password("root").version(Version.V_0_13).build();
session = new Session.Builder()
.host(LOCAL_HOST)
.port(6667)
.username("root")
.password("root")
.version(Version.V_0_13)
.build();
session.open(false);
session.setFetchSize(100);
log.info("iotdb连接成功~");

View File

@ -1,14 +1,16 @@
package com.mobai.controller;
package com.mobai.iotDB.controller;
import com.mobai.config.IotDBSessionConfig;
import com.mobai.domain.IotDbParam;
import com.mobai.domain.resp.ResponseData;
import com.mobai.service.IotDbServer;
import com.mobai.domain.Vehicle;
import com.mobai.iotDB.config.IotDBSessionConfig;
import com.mobai.iotDB.domain.resp.ResponseData;
import com.mobai.iotDB.service.IotDbServer;
import com.mobai.req.VehicleReq;
import lombok.extern.log4j.Log4j2;
import org.apache.iotdb.rpc.IoTDBConnectionException;
import org.apache.iotdb.rpc.StatementExecutionException;
import org.springframework.web.bind.annotation.*;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;
import java.rmi.ServerException;
@ -29,21 +31,21 @@ public class IotDbController {
/**
*
* @param iotDbParam
* @param vehicle
*/
@PostMapping("/api/device/insert")
public ResponseData insert(@RequestBody IotDbParam iotDbParam) throws StatementExecutionException, ServerException, IoTDBConnectionException {
iotDbServer.insertData(iotDbParam);
public ResponseData insert(@RequestBody Vehicle vehicle) throws StatementExecutionException, ServerException, IoTDBConnectionException {
iotDbServer.insertData(vehicle);
return ResponseData.success();
}
/**
*
* @param iotDbParam
*
* @param vehicle
*/
@PostMapping("/api/device/queryData")
public ResponseData queryDataFromIotDb(@RequestBody IotDbParam iotDbParam) throws Exception {
return ResponseData.success(iotDbServer.queryDataFromIotDb(iotDbParam));
public ResponseData queryDataFromIotDb(@RequestBody VehicleReq vehicle) throws Exception {
return ResponseData.success(iotDbServer.queryDataFromIotDb(vehicle));
}
/**

View File

@ -1,4 +1,4 @@
package com.mobai.domain;
package com.mobai.iotDB.domain;
import lombok.Data;
/**

View File

@ -0,0 +1,23 @@
package com.mobai.iotDB.domain;
import lombok.Data;
/**
* description:
* date: 2022/8/15 21:56
* author: zhouhong
*/
@Data
public class IotDbResult {
/***
*
*/
private String startTime;
/***
*
*/
private String VIN;
}

View File

@ -1,4 +1,4 @@
package com.mobai.domain.resp;
package com.mobai.iotDB.domain.resp;
import lombok.AllArgsConstructor;
import lombok.Data;

View File

@ -0,0 +1,23 @@
package com.mobai.iotDB.service;
import com.mobai.domain.Vehicle;
import com.mobai.iotDB.domain.IotDbParam;
import com.mobai.req.VehicleReq;
import org.apache.iotdb.rpc.IoTDBConnectionException;
import org.apache.iotdb.rpc.StatementExecutionException;
import java.rmi.ServerException;
/**
* @ClassName IotDbServer
* @Description
* @Author Mobai
* @Date 2024/6/17 17:20
*/
public interface IotDbServer {
void insertData(Vehicle vehicle) throws StatementExecutionException, ServerException, IoTDBConnectionException;
Object queryDataFromIotDb(VehicleReq req) throws Exception;
}

View File

@ -0,0 +1,252 @@
package com.mobai.iotDB.service.impl;
import com.mobai.domain.Vehicle;
import com.mobai.iotDB.config.IotDBSessionConfig;
import com.mobai.iotDB.service.IotDbServer;
import com.mobai.req.VehicleReq;
import lombok.extern.log4j.Log4j2;
import org.apache.iotdb.rpc.IoTDBConnectionException;
import org.apache.iotdb.rpc.StatementExecutionException;
import org.apache.iotdb.session.SessionDataSet;
import org.apache.iotdb.tsfile.read.common.Field;
import org.apache.iotdb.tsfile.read.common.RowRecord;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
import java.math.BigDecimal;
import java.rmi.ServerException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* description: iot
* date: 2022/8/15 9:43
* author: zhouhong
*/
@Log4j2
@Service
public class IotDbServerImpl implements IotDbServer {
@Resource
private IotDBSessionConfig iotDBSessionConfig;
@Override
public void insertData(Vehicle vehicle) throws StatementExecutionException, ServerException, IoTDBConnectionException {
// iotDbParam: 模拟设备上报消息
// bizkey: 业务唯一key PK :产品唯一编码 SN:设备唯一编码
String deviceId = "root.vin." + vehicle.getVin();
// 将设备上报的数据存入数据库(时序数据库)
List<String> measurementsList = new ArrayList<>() {{
add("startTime");
add("longitude");
add("latitude");
add("speed");
add("mileage");
add("voltage");
add("current");
add("resistance");
add("gear");
add("accelerationPedal");
add("brakePedal");
add("fuelConsumptionRate");
add("motorControllerTemperature");
add("motorSpeed");
add("motoTorque");
add("motorTemperature");
add("motorVoltage");
add("motorCurrent");
add("remainingBattery");
add("maximumFeedbackPower");
add("maximumDischargePower");
add("selfCheckCounter");
add("totalBatteryCurrent");
add("totalBatteryVoltage");
add("singleBatteryMaxVoltage");
add("singleBatteryMinVoltage");
add("singleBatteryMaxTemperature");
add("singleBatteryMinTemperature");
add("availableBatteryCapacity");
add("vehicleStatus");
add("chargingStatus");
add("operatingStatus");
add("chargingEnergyStorageStatus");
add("driveMotorStatus");
add("positionStatus");
add("easStatus");
add("ptcStatus");
add("epsStatus");
add("absStatus");
add("mcuStatus");
add("heatingStatus");
add("batteryStatus");
add("batteryInsulationStatus");
add("dcdcStatus");
add("chgStatus");
}};
//车辆具体数据
List<String> valuesList = new ArrayList<>() {{
add(String.valueOf(vehicle.getStartTime()));
add(String.valueOf(vehicle.getLongitude()));
add(String.valueOf(vehicle.getLatitude()));
add(String.valueOf(vehicle.getSpeed()));
add(String.valueOf(vehicle.getMileage()));
add(String.valueOf(vehicle.getVoltage()));
add(String.valueOf(vehicle.getCurrent()));
add(String.valueOf(vehicle.getResistance()));
add(String.valueOf(vehicle.getGear()));
add(String.valueOf(vehicle.getAccelerationPedal()));
add(String.valueOf(vehicle.getBrakePedal()));
add(String.valueOf(vehicle.getFuelConsumptionRate()));
add(String.valueOf(vehicle.getMotorControllerTemperature()));
add(String.valueOf(vehicle.getMotorSpeed()));
add(String.valueOf(vehicle.getMotoTorque()));
add(String.valueOf(vehicle.getMotorTemperature()));
add(String.valueOf(vehicle.getMotorVoltage()));
add(String.valueOf(vehicle.getMotorCurrent()));
add(String.valueOf(vehicle.getRemainingBattery()));
add(String.valueOf(vehicle.getMaximumFeedbackPower()));
add(String.valueOf(vehicle.getMaximumDischargePower()));
add(String.valueOf(vehicle.getSelfCheckCounter()));
add(String.valueOf(vehicle.getTotalBatteryCurrent()));
add(String.valueOf(vehicle.getTotalBatteryVoltage()));
add(String.valueOf(vehicle.getSingleBatteryMaxVoltage()));
add(String.valueOf(vehicle.getSingleBatteryMinVoltage()));
add(String.valueOf(vehicle.getSingleBatteryMaxTemperature()));
add(String.valueOf(vehicle.getSingleBatteryMinTemperature()));
add(String.valueOf(vehicle.getAvailableBatteryCapacity()));
add(String.valueOf(vehicle.getVehicleStatus()));
add(String.valueOf(vehicle.getChargingStatus()));
add(String.valueOf(vehicle.getOperatingStatus()));
add(String.valueOf(vehicle.getChargingEnergyStorageStatus()));
add(String.valueOf(vehicle.getDriveMotorStatus()));
add(String.valueOf(vehicle.getPositionStatus()));
add(String.valueOf(vehicle.getEasStatus()));
add(String.valueOf(vehicle.getPtcStatus()));
add(String.valueOf(vehicle.getEpsStatus()));
add(String.valueOf(vehicle.getAbsStatus()));
add(String.valueOf(vehicle.getMcuStatus()));
add(String.valueOf(vehicle.getHeatingStatus()));
add(String.valueOf(vehicle.getBatteryStatus()));
add(String.valueOf(vehicle.getBatteryInsulationStatus()));
add(String.valueOf(vehicle.getDcdcStatus()));
add(String.valueOf(vehicle.getChgStatus()));
}};
iotDBSessionConfig.insertRecord(deviceId, Long.valueOf(vehicle.getStartTime()), measurementsList, valuesList);
}
@Override
public List<Vehicle> queryDataFromIotDb(VehicleReq req) throws Exception {
List<Vehicle> iotDbResultList = new ArrayList<>();
if (null != req.getVin()) {
String sql = "select * from " + "root.vin." + req.getVin();
// 开始时间
if (req.getStartTime() != null) {
sql += " where startTime >= " + req.getStartTime();
}
// 结束时间
if (req.getStartTime() != null) {
sql += " and startTime < " + req.getEndTime();
}
SessionDataSet sessionDataSet = iotDBSessionConfig.query(sql);
List<String> columnNames = sessionDataSet.getColumnNames();
List<String> titleList = new ArrayList<>();
// 排除Time字段 -- 方便后面后面拼装数据
for (int i = 1; i < columnNames.size(); i++) {
String[] temp = columnNames.get(i).split("\\.");
titleList.add(temp[temp.length - 1]);
}
// 封装处理数据
packagingData(req, iotDbResultList, sessionDataSet, titleList);
} else {
log.info("VIN不能为空");
}
return iotDbResultList;
}
/**
*
*
* @param req
* @param iotDbResultList
* @param sessionDataSet
* @param titleList
* @throws StatementExecutionException
* @throws IoTDBConnectionException
*/
private void packagingData(VehicleReq req, List<Vehicle> iotDbResultList, SessionDataSet sessionDataSet, List<String> titleList)
throws StatementExecutionException, IoTDBConnectionException {
int fetchSize = sessionDataSet.getFetchSize();
if (fetchSize > 0) {
while (sessionDataSet.hasNext()) {
Vehicle vehicle = new Vehicle();
RowRecord next = sessionDataSet.next();
List<Field> fields = next.getFields();
vehicle.setStartTime(next.getTimestamp());
Map<String, String> map = new HashMap<>();
for (int i = 0; i < fields.size(); i++) {
Field field = fields.get(i);
// 这里的需要按照类型获取
map.put(titleList.get(i), field.getObjectValue(field.getDataType()).toString());
}
log.info(map);
// vehicle.setStartTime(Long.valueOf(map.get("startTime")));
vehicle.setVin(map.get("vin"));
vehicle.setSpeed(new BigDecimal(map.get("speed")));
vehicle.setLatitude(new BigDecimal(map.get("latitude")));
vehicle.setLongitude(new BigDecimal(map.get("longitude")));
vehicle.setMileage(new BigDecimal(map.get("mileage")));
vehicle.setGear(map.get("gear"));
vehicle.setVehicleStatus(new BigDecimal(map.get("vehicleStatus")).intValue());
vehicle.setChargingStatus(new BigDecimal(map.get("chargingStatus")).intValue());
vehicle.setChargingEnergyStorageStatus(new BigDecimal(map.get("chargingEnergyStorageStatus")).intValue());
vehicle.setEasStatus(new BigDecimal(map.get("easStatus")).intValue());
vehicle.setMotorControllerTemperature(new BigDecimal(map.get("motorControllerTemperature")));
vehicle.setTotalBatteryCurrent(new BigDecimal(map.get("totalBatteryCurrent")));
vehicle.setSingleBatteryMaxVoltage(new BigDecimal(map.get("singleBatteryMaxVoltage")));
vehicle.setOperatingStatus(new BigDecimal(map.get("operatingStatus")).intValue());
vehicle.setHeatingStatus(new BigDecimal(map.get("heatingStatus")).intValue());
vehicle.setDcdcStatus(new BigDecimal(map.get("dcdcStatus")).intValue());
vehicle.setDriveMotorStatus(new BigDecimal(map.get("driveMotorStatus")).intValue());
vehicle.setPositionStatus(new BigDecimal(map.get("positionStatus")).intValue());
vehicle.setPtcStatus(new BigDecimal(map.get("ptcStatus")).intValue());
vehicle.setEpsStatus(new BigDecimal(map.get("epsStatus")).intValue());
vehicle.setAbsStatus(new BigDecimal(map.get("absStatus")).intValue());
vehicle.setMcuStatus(new BigDecimal(map.get("mcuStatus")).intValue());
vehicle.setBatteryInsulationStatus(new BigDecimal(map.get("batteryInsulationStatus")).intValue());
vehicle.setBatteryStatus(new BigDecimal(map.get("batteryStatus")).intValue());
vehicle.setChgStatus(new BigDecimal(map.get("chgStatus")).intValue());
vehicle.setTotalBatteryVoltage(new BigDecimal(map.get("totalBatteryVoltage")));
vehicle.setMotorSpeed(new BigDecimal(map.get("motorSpeed")));
vehicle.setMotorCurrent(new BigDecimal(map.get("motorCurrent")));
vehicle.setMotorVoltage(new BigDecimal(map.get("motorVoltage")));
vehicle.setAccelerationPedal(new BigDecimal(map.get("accelerationPedal")));
vehicle.setBrakePedal(new BigDecimal(map.get("brakePedal")));
vehicle.setSelfCheckCounter(new BigDecimal(map.get("selfCheckCounter")));
vehicle.setMotorTemperature(new BigDecimal(map.get("motorTemperature")));
vehicle.setMaximumDischargePower(new BigDecimal(map.get("maximumDischargePower")));
vehicle.setMaximumFeedbackPower(new BigDecimal(map.get("maximumFeedbackPower")));
vehicle.setFuelConsumptionRate(new BigDecimal(map.get("fuelConsumptionRate")));
vehicle.setVin(req.getVin());
vehicle.setVoltage(new BigDecimal(map.get("voltage")));
vehicle.setCurrent(new BigDecimal(map.get("current")));
vehicle.setResistance(new BigDecimal(map.get("resistance")));
vehicle.setMotoTorque(new BigDecimal(map.get("motoTorque")));
vehicle.setRemainingBattery(new BigDecimal(map.get("remainingBattery")));
vehicle.setSingleBatteryMinVoltage(new BigDecimal(map.get("singleBatteryMinVoltage")));
vehicle.setSingleBatteryMaxTemperature(new BigDecimal(map.get("singleBatteryMaxTemperature")));
vehicle.setSingleBatteryMinTemperature(new BigDecimal(map.get("singleBatteryMinTemperature")));
vehicle.setAvailableBatteryCapacity(new BigDecimal(map.get("availableBatteryCapacity")));
iotDbResultList.add(vehicle);
}
}
}
}

View File

@ -0,0 +1,51 @@
package com.mobai.kafka.listener;
import com.alibaba.fastjson2.JSON;
import com.mobai.domain.Vehicle;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.context.annotation.Bean;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.stereotype.Component;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
/**
*
* @description: kafka
* @copyright: @Copyright (c) 2022
* @company: hmblogs
* @author: heming
* @version: 1.0.0
* @createTime: 2024-01-18 8:31
*/
@Component
@Slf4j
public class kafkaConsumerListenerExample {
@KafkaListener(topics = {"topic0","topic1"}, groupId = "0")
public void consume(ConsumerRecord<String, String> record) {
Vehicle value = JSON.parseObject(record.value(), Vehicle.class);
// 进行消息处理逻辑
log.info("车辆报文信息 " + value);
}
//批量消费
@KafkaListener( topics = {"topic0","topic1"}, groupId = "Topics")
public void onBatchMessage(List<ConsumerRecord<String, String>> records) {
System.out.println(">>> 批量消费一次recoreds.size()=" + records.size());
// List<Vehicle> collect = records.stream().map(record -> {
// Vehicle value = JSON.parseObject(record.value(), Vehicle.class);
// log.info("车辆报文:{}", value);
// return value;
// }).toList();
// log.warn("批量消费的数量为:{},结果为:{}", records.size(), collect);
System.out.println(records);
}
}

View File

@ -1,5 +1,8 @@
server:
port: 8084
port: 10001
kafka:
topic: vehicle-event-topic0
partition: 0
spring:
redis:
host: 127.0.0.1
@ -8,4 +11,87 @@ spring:
stream:
username: guest
password: guest
# kafka 配置
kafka:
producer:
# Kafka服务器
bootstrap-servers: 127.0.0.1:9092
# 开启事务,必须在开启了事务的方法中发送,否则报错
# transaction-id-prefix: kafkaTx-
# 发生错误后消息重发的次数开启事务必须设置大于0。
retries: 3
# acks=0 生产者在成功写入消息之前不会等待任何来自服务器的响应。
# acks=1 只要集群的首领节点收到消息,生产者就会收到一个来自服务器成功响应。
# acks=all :只有当所有参与复制的节点全部收到消息时,生产者才会收到一个来自服务器的成功响应。
# 开启事务时必须设置为all
acks: all
# 当有多个消息需要被发送到同一个分区时,生产者会把它们放在同一个批次里。该参数指定了一个批次可以使用的内存大小,按照字节数计算。
batch-size: 16384
# 生产者内存缓冲区的大小。
buffer-memory: 1024000
# 键的序列化方式
# key-serializer: org.springframework.kafka.support.serializer.JsonSerializer
# # 值的序列化方式建议使用Json这种序列化方式可以无需额外配置传输实体类
# value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
properties:
linger:
ms: 2000 # 延迟提交
partitioner: #指定分区器
class: com.mobai.kafka.CustomPartitioner # 分区器路径
consumer:
# Kafka服务器
bootstrap-servers: 127.0.0.1:9092
group-id: firstGroup #默认消费者组ID
# 自动提交的时间间隔 在spring boot 2.X 版本中这里采用的是值的类型为Duration 需要符合特定的格式如1S,1M,2H,5D
#auto-commit-interval: 2s
# 该属性指定了消费者在读取一个没有偏移量的分区或者偏移量无效的情况下该作何处理:
# earliest当各分区下有已提交的offset时从提交的offset开始消费无提交的offset时从头开始消费分区的记录
# latest当各分区下有已提交的offset时从提交的offset开始消费无提交的offset时消费新产生的该分区下的数据在消费者启动之后生成的记录
# none当各分区都存在已提交的offset时从提交的offset开始消费只要有一个分区不存在已提交的offset则抛出异常
auto-offset-reset: earliest
# 是否自动提交偏移量默认值是true为了避免出现重复数据和数据丢失可以把它设置为false然后手动提交偏移量
enable-auto-commit: false
# 键的反序列化方式
#key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
# key-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
key-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
# 值的反序列化方式建议使用Json这种序列化方式可以无需额外配置传输实体类
# value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
value-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
# 配置消费者的 Json 反序列化的可信赖包,反序列化实体类需要
properties:
spring:
json:
trusted:
packages: "*"
spring.deserializer.key.delegate.class: org.apache.kafka.common.serialization.StringDeserializer
spring.deserializer.value.delegate.class: org.apache.kafka.common.serialization.StringDeserializer
# 这个参数定义了poll方法最多可以拉取多少条消息默认值为500。如果在拉取消息的时候新消息不足500条那有多少返回多少如果超过500条每次只返回500。
# 这个默认值在有些场景下太大有些场景很难保证能够在5min内处理完500条消息
# 如果消费者无法在5分钟内处理完500条消息的话就会触发reBalance,
# 然后这批消息会被分配到另一个消费者中,还是会处理不完,这样这批消息就永远也处理不完。
# 要避免出现上述问题提前评估好处理一条消息最长需要多少时间然后覆盖默认的max.poll.records参数
# 注需要开启BatchListener批量监听才会生效如果不开启BatchListener则不会出现reBalance情况
max-poll-records: 3
properties:
# 两次poll之间的最大间隔默认值为5分钟。如果超过这个间隔会触发reBalance
max:
poll:
interval:
ms: 600000
# 当broker多久没有收到consumer的心跳请求后就触发reBalance默认值是10s
session:
timeout:
ms: 10000
listener:
# 在侦听器容器中运行的线程数,一般设置为 机器数*分区数
concurrency: 4
# 自动提交关闭,需要设置手动消息确认
ack-mode: manual_immediate
# 消费监听接口监听的主题不存在时默认会报错所以设置为false忽略错误
missing-topics-fatal: false
# 两次poll之间的最大间隔默认值为5分钟。如果超过这个间隔会触发reBalance
poll-timeout: 600000

View File

@ -10,10 +10,9 @@
<packaging>pom</packaging>
<modules>
<module>mobai-event-common</module>
<module>mobai-event-service</module>
<module>mobai-event-remote</module>
<module>mobai-event-client</module>
<module>mobai-event-iotDBDemo</module>
<module>mobai-event-remote</module>
<module>mobai-event-service</module>
</modules>
<properties>