diff --git a/.idea/inspectionProfiles/Project_Default.xml b/.idea/inspectionProfiles/Project_Default.xml
new file mode 100644
index 0000000..ee2c34b
--- /dev/null
+++ b/.idea/inspectionProfiles/Project_Default.xml
@@ -0,0 +1,14 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/.idea/misc.xml b/.idea/misc.xml
index c3f3b0a..c60a15a 100644
--- a/.idea/misc.xml
+++ b/.idea/misc.xml
@@ -1,3 +1,4 @@
+
@@ -6,6 +7,11 @@
+
diff --git a/mobai-event-client/pom.xml b/mobai-event-client/pom.xml
index 3cb63ae..df20258 100644
--- a/mobai-event-client/pom.xml
+++ b/mobai-event-client/pom.xml
@@ -18,6 +18,10 @@
+
+ org.springframework.boot
+ spring-boot-starter
+
com.mobai
diff --git a/mobai-event-service/src/main/java/com/mobai/EventAnalysisProducerApplication.java b/mobai-event-client/src/main/java/com/mobai/EventAnalysisProducerApplication.java
similarity index 100%
rename from mobai-event-service/src/main/java/com/mobai/EventAnalysisProducerApplication.java
rename to mobai-event-client/src/main/java/com/mobai/EventAnalysisProducerApplication.java
diff --git a/mobai-event-service/src/main/java/com/mobai/controller/Producer.java b/mobai-event-client/src/main/java/com/mobai/controller/Producer.java
similarity index 100%
rename from mobai-event-service/src/main/java/com/mobai/controller/Producer.java
rename to mobai-event-client/src/main/java/com/mobai/controller/Producer.java
diff --git a/mobai-event-client/src/main/java/com/mobai/vehicle/event/client/demo/Recv.java b/mobai-event-client/src/main/java/com/mobai/vehicle/event/client/demo/Recv.java
deleted file mode 100644
index 85b4d2a..0000000
--- a/mobai-event-client/src/main/java/com/mobai/vehicle/event/client/demo/Recv.java
+++ /dev/null
@@ -1,27 +0,0 @@
-package com.mobai.vehicle.event.client.demo;
-
-import com.rabbitmq.client.Channel;
-import com.rabbitmq.client.Connection;
-import com.rabbitmq.client.ConnectionFactory;
-import com.rabbitmq.client.DeliverCallback;
-
-public class Recv {
-
- private final static String QUEUE_NAME = "hello";
-
- public static void main(String[] argv) throws Exception {
-
- ConnectionFactory factory = new ConnectionFactory();
- factory.setHost("175.24.138.82");
- Connection connection = factory.newConnection();
- Channel channel = connection.createChannel();
-
- channel.queueDeclare(QUEUE_NAME, false, false, false, null);
- System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
- DeliverCallback deliverCallback = (consumerTag, delivery) -> {
- String message = new String(delivery.getBody(), "UTF-8");
- System.out.println(" [x] Received '" + message + "'");
- };
- channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });
- }
-}
diff --git a/mobai-event-client/src/main/resources/application.yml b/mobai-event-client/src/main/resources/application.yml
index b1ad429..c28bbd0 100644
--- a/mobai-event-client/src/main/resources/application.yml
+++ b/mobai-event-client/src/main/resources/application.yml
@@ -1,9 +1,8 @@
server:
- port: 10001
-kafka:
- topic: vehicle-event-topic0
- partition: 0
+ port: 8084
spring:
+ redis:
+ host: 127.0.0.1
rabbitmq:
host: 175.24.138.82
stream:
diff --git a/mobai-event-common/src/main/java/com/mobai/domain/Vehicle.java b/mobai-event-common/src/main/java/com/mobai/domain/Vehicle.java
new file mode 100644
index 0000000..758d5c4
--- /dev/null
+++ b/mobai-event-common/src/main/java/com/mobai/domain/Vehicle.java
@@ -0,0 +1,208 @@
+package com.mobai.domain;
+
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+import java.io.Serializable;
+import java.math.BigDecimal;
+
+/**
+ * 车辆数据信息
+ * @author Mobai
+ * @className Vehicle
+ * @description 描述
+ * @date 2024/6/6 8:18
+ */
+@Data
+@Builder
+@NoArgsConstructor
+@AllArgsConstructor
+public class Vehicle implements Serializable {
+ /**
+ * 车辆vin
+ */
+ private String vin;
+ /**
+ * 时间戳
+ */
+ private Long startTime;
+ /**
+ * 经度
+ */
+ private BigDecimal longitude;
+ /**
+ * 纬度
+ */
+ private BigDecimal latitude;
+ /**
+ * 速度
+ */
+ private BigDecimal speed;
+ /**
+ * 总里程
+ */
+ private BigDecimal mileage;
+ /**
+ * 总电压
+ */
+ private BigDecimal voltage;
+ /**
+ * 总电流
+ */
+ private BigDecimal current;
+ /**
+ * 绝缘电阻
+ */
+ private BigDecimal resistance;
+ /**
+ * 档位
+ */
+ private String gear;
+ /**
+ * 加速踏板行程值
+ */
+ private BigDecimal accelerationPedal;
+ /**
+ * 制动踏板行程值
+ */
+ private BigDecimal brakePedal;
+ /**
+ * 燃料消耗率
+ */
+ private BigDecimal fuelConsumptionRate;
+ /**
+ * 电机控制器温度
+ */
+ private BigDecimal motorControllerTemperature;
+ /**
+ * 电机转速
+ */
+ private BigDecimal motorSpeed;
+ /**
+ * 电机转矩
+ */
+ private BigDecimal motoTorque;
+ /**
+ * 电机温度
+ */
+ private BigDecimal motorTemperature;
+ /**
+ * 电机电压
+ */
+ private BigDecimal motorVoltage;
+ /**
+ * 电机电流
+ */
+ private BigDecimal motorCurrent;
+ /**
+ * 动力电池剩余电量SOC
+ */
+ private BigDecimal remainingBattery;
+ /**
+ * 当前状态允许的最大反馈功率
+ */
+ private BigDecimal maximumFeedbackPower;
+ /**
+ * 当前状态允许的最大放电功率
+ */
+ private BigDecimal maximumDischargePower;
+ /**
+ * BMS自检计数器
+ */
+ private BigDecimal selfCheckCounter;
+ /**
+ * 动力电池充放电电流
+ */
+ private BigDecimal totalBatteryCurrent;
+ /**
+ * 动力电池负载端总电压V3
+ */
+ private BigDecimal totalBatteryVoltage;
+ /**
+ * 单次最大电压
+ */
+ private BigDecimal singleBatteryMaxVoltage;
+ /**
+ * 单次最低电压
+ */
+ private BigDecimal singleBatteryMinVoltage;
+ /**
+ * 单体电池最高温度
+ */
+ private BigDecimal singleBatteryMaxTemperature;
+ /**
+ * 单体电池最低温度
+ */
+ private BigDecimal singleBatteryMinTemperature;
+ /**
+ * 动力电池可用容量
+ */
+ private BigDecimal availableBatteryCapacity;
+ /**
+ * 车辆状态
+ */
+ private Integer vehicleStatus;
+ /**
+ * 充电状态
+ */
+ private Integer chargingStatus;
+ /**
+ * 运行状态
+ */
+ private Integer operatingStatus;
+ /**
+ * SOC
+ */
+ private Integer chargingEnergyStorageStatus;
+ /**
+ * 可充电储能装置工作状态
+ */
+ private Integer driveMotorStatus;
+ /**
+ * 定位是否有效
+ */
+ private Integer positionStatus;
+ /**
+ * EAS
+ */
+ private Integer easStatus;
+ /**
+ * PTC
+ */
+ private Integer ptcStatus;
+ /**
+ * EPS
+ */
+ private Integer epsStatus;
+ /**
+ * ABS
+ */
+ private Integer absStatus;
+ /**
+ * MCU
+ */
+ private Integer mcuStatus;
+ /**
+ * 动力电池加热状态
+ */
+ private Integer heatingStatus;
+ /**
+ * 动力电池当前状态
+ */
+ private Integer batteryStatus;
+ /**
+ * 动力电池保温状态
+ */
+ private Integer batteryInsulationStatus;
+ /**
+ * DCDC
+ */
+ private Integer dcdcStatus;
+ /**
+ * CHG
+ */
+ private Integer chgStatus;
+
+}
diff --git a/mobai-event-common/src/main/java/com/mobai/req/VehicleReq.java b/mobai-event-common/src/main/java/com/mobai/req/VehicleReq.java
new file mode 100644
index 0000000..eec4d0d
--- /dev/null
+++ b/mobai-event-common/src/main/java/com/mobai/req/VehicleReq.java
@@ -0,0 +1,34 @@
+package com.mobai.req;
+
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+/**
+ * @author Mobai
+ * @className VehicleReq
+ * @description 描述
+ * @date 2024/6/17 19:24
+ */
+@Data
+@Builder
+@NoArgsConstructor
+@AllArgsConstructor
+public class VehicleReq {
+ /**
+ * 车辆唯一标识
+ */
+ private String vin;
+
+ /**
+ * 开始时间点
+ */
+ private Long startTime;
+
+ /**
+ * 结束时间点
+ */
+ private Long endTime;
+
+}
diff --git a/mobai-event-common/src/main/java/com/mobai/车辆参数 b/mobai-event-common/src/main/java/com/mobai/车辆参数
new file mode 100644
index 0000000..a0c5855
--- /dev/null
+++ b/mobai-event-common/src/main/java/com/mobai/车辆参数
@@ -0,0 +1,46 @@
+vin
+startTime
+longitude
+latitude
+speed
+mileage
+voltage
+current
+resistance
+gear
+accelerationPedal
+brakePedal
+fuelConsumptionRate
+motorControllerTemperature
+motorSpeed
+motoTorque
+motorTemperature
+motorVoltage
+motorCurrent
+remainingBattery
+maximumFeedbackPower
+maximumDischargePower
+selfCheckCounter
+totalBatteryCurrent
+totalBatteryVoltage
+singleBatteryMaxVoltage
+singleBatteryMinVoltage
+singleBatteryMaxTemperature
+singleBatteryMinTemperature
+availableBatteryCapacity
+vehicleStatus
+chargingStatus
+operatingStatus
+chargingEnergyStorageStatus
+driveMotorStatus
+positionStatus
+easStatus
+ptcStatus
+epsStatus
+absStatus
+mcuStatus
+heatingStatus
+batteryStatus
+batteryInsulationStatus
+dcdcStatus
+chgStatus
diff --git a/mobai-event-iotDBDemo/pom.xml b/mobai-event-iotDBDemo/pom.xml
deleted file mode 100644
index 93c6037..0000000
--- a/mobai-event-iotDBDemo/pom.xml
+++ /dev/null
@@ -1,68 +0,0 @@
-
-
- 4.0.0
-
- com.mobai
- event-analysis
- 1.0.0
-
-
- mobai-event-iotDBDemo
-
-
- 17
- 17
- UTF-8
-
-
-
-
- org.apache.iotdb
- iotdb-session
- 0.14.0-preview1
-
-
-
- cn.hutool
- hutool-all
- 5.6.3
-
-
-
- com.alibaba
- fastjson
- 1.2.83
-
-
-
- org.springframework.boot
- spring-boot-starter-web
-
-
-
- org.projectlombok
- lombok
- true
-
-
- org.springframework.boot
- spring-boot-starter-test
- test
-
-
- org.junit.vintage
- junit-vintage-engine
-
-
-
-
- com.mobai
- mobai-event-common
- 1.0.0
- compile
-
-
-
-
diff --git a/mobai-event-iotDBDemo/src/main/java/com/mobai/IotDBApplication.java b/mobai-event-iotDBDemo/src/main/java/com/mobai/IotDBApplication.java
deleted file mode 100644
index d37022d..0000000
--- a/mobai-event-iotDBDemo/src/main/java/com/mobai/IotDBApplication.java
+++ /dev/null
@@ -1,13 +0,0 @@
-package com.mobai;
-
-import org.springframework.boot.SpringApplication;
-import org.springframework.boot.autoconfigure.SpringBootApplication;
-
-
-@SpringBootApplication
-public class IotDBApplication {
- public static void main(String[] args) {
-
- SpringApplication.run(IotDBApplication.class);
- }
-}
diff --git a/mobai-event-iotDBDemo/src/main/java/com/mobai/domain/IotDbResult.java b/mobai-event-iotDBDemo/src/main/java/com/mobai/domain/IotDbResult.java
deleted file mode 100644
index b46ce3b..0000000
--- a/mobai-event-iotDBDemo/src/main/java/com/mobai/domain/IotDbResult.java
+++ /dev/null
@@ -1,33 +0,0 @@
-package com.mobai.domain;
-
-import lombok.Data;
-
-/**
- * description: 返回结果
- * date: 2022/8/15 21:56
- * author: zhouhong
- */
-@Data
-public class IotDbResult {
- /***
- * 时间
- */
- private String time;
- /***
- * 产品PK
- */
- private String pk;
- /***
- * 设备号
- */
- private String sn;
- /***
- * 实时呼吸
- */
- private String breath;
- /***
- * 实时心率
- */
- private String heart;
-
-}
diff --git a/mobai-event-iotDBDemo/src/main/java/com/mobai/service/IotDbServer.java b/mobai-event-iotDBDemo/src/main/java/com/mobai/service/IotDbServer.java
deleted file mode 100644
index 9bae9bf..0000000
--- a/mobai-event-iotDBDemo/src/main/java/com/mobai/service/IotDbServer.java
+++ /dev/null
@@ -1,21 +0,0 @@
-package com.mobai.service;
-
-import com.mobai.domain.IotDbParam;
-import org.apache.iotdb.rpc.IoTDBConnectionException;
-import org.apache.iotdb.rpc.StatementExecutionException;
-
-import java.rmi.ServerException;
-
-/**
- * @ClassName IotDbServer
- * @Description 描述
- * @Author Mobai
- * @Date 2024/6/17 17:20
- */
-public interface IotDbServer {
-
- void insertData(IotDbParam iotDbParam) throws StatementExecutionException, ServerException, IoTDBConnectionException;
-
- Object queryDataFromIotDb(IotDbParam iotDbParam) throws Exception;
-
-}
diff --git a/mobai-event-iotDBDemo/src/main/java/com/mobai/service/impl/IotDbServerImpl.java b/mobai-event-iotDBDemo/src/main/java/com/mobai/service/impl/IotDbServerImpl.java
deleted file mode 100644
index a86659b..0000000
--- a/mobai-event-iotDBDemo/src/main/java/com/mobai/service/impl/IotDbServerImpl.java
+++ /dev/null
@@ -1,107 +0,0 @@
-package com.mobai.service.impl;
-
-import com.mobai.config.IotDBSessionConfig;
-import com.mobai.domain.IotDbParam;
-import com.mobai.domain.IotDbResult;
-import com.mobai.service.IotDbServer;
-import lombok.extern.log4j.Log4j2;
-import org.apache.iotdb.rpc.IoTDBConnectionException;
-import org.apache.iotdb.rpc.StatementExecutionException;
-import org.apache.iotdb.session.SessionDataSet;
-import org.apache.iotdb.tsfile.read.common.Field;
-import org.apache.iotdb.tsfile.read.common.RowRecord;
-import org.springframework.stereotype.Service;
-
-import javax.annotation.Resource;
-import java.rmi.ServerException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-/**
- * description: iot服务实现类
- * date: 2022/8/15 9:43
- * author: zhouhong
- */
-
-@Log4j2
-@Service
-public class IotDbServerImpl implements IotDbServer {
-
- @Resource
- private IotDBSessionConfig iotDBSessionConfig;
-
- @Override
- public void insertData(IotDbParam iotDbParam) throws StatementExecutionException, ServerException, IoTDBConnectionException {
- // iotDbParam: 模拟设备上报消息
- // bizkey: 业务唯一key PK :产品唯一编码 SN:设备唯一编码
- String deviceId = "root.bizkey."+ iotDbParam.getPk() + "." + iotDbParam.getSn();
- // 将设备上报的数据存入数据库(时序数据库)
- List measurementsList = new ArrayList<>();
- measurementsList.add("heart");
- measurementsList.add("breath");
- List valuesList = new ArrayList<>();
- valuesList.add(String.valueOf(iotDbParam.getHeart()));
- valuesList.add(String.valueOf(iotDbParam.getBreath()));
- iotDBSessionConfig.insertRecord(deviceId, iotDbParam.getTime(), measurementsList, valuesList);
- }
-
- @Override
- public List queryDataFromIotDb(IotDbParam iotDbParam) throws Exception {
- List iotDbResultList = new ArrayList<>();
-
- if (null != iotDbParam.getPk() && null != iotDbParam.getSn()) {
- String sql = "select * from root.bizkey."+ iotDbParam.getPk() +"." + iotDbParam.getSn() + " where time >= "
- + iotDbParam.getStartTime() + " and time < " + iotDbParam.getEndTime();
- SessionDataSet sessionDataSet = iotDBSessionConfig.query(sql);
- List columnNames = sessionDataSet.getColumnNames();
- List titleList = new ArrayList<>();
- // 排除Time字段 -- 方便后面后面拼装数据
- for (int i = 1; i < columnNames.size(); i++) {
- String[] temp = columnNames.get(i).split("\\.");
- titleList.add(temp[temp.length - 1]);
- }
- // 封装处理数据
- packagingData(iotDbParam, iotDbResultList, sessionDataSet, titleList);
- } else {
- log.info("PK或者SN不能为空!!");
- }
- return iotDbResultList;
- }
- /**
- * 封装处理数据
- * @param iotDbParam
- * @param iotDbResultList
- * @param sessionDataSet
- * @param titleList
- * @throws StatementExecutionException
- * @throws IoTDBConnectionException
- */
- private void packagingData(IotDbParam iotDbParam, List iotDbResultList, SessionDataSet sessionDataSet, List titleList)
- throws StatementExecutionException, IoTDBConnectionException {
- int fetchSize = sessionDataSet.getFetchSize();
- if (fetchSize > 0) {
- while (sessionDataSet.hasNext()) {
- IotDbResult iotDbResult = new IotDbResult();
- RowRecord next = sessionDataSet.next();
- List fields = next.getFields();
- String timeString = new java.text.SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(next.getTimestamp());
- iotDbResult.setTime(timeString);
- Map map = new HashMap<>();
-
- for (int i = 0; i < fields.size(); i++) {
- Field field = fields.get(i);
- // 这里的需要按照类型获取
- map.put(titleList.get(i), field.getObjectValue(field.getDataType()).toString());
- }
- iotDbResult.setTime(timeString);
- iotDbResult.setPk(iotDbParam.getPk());
- iotDbResult.setSn(iotDbParam.getSn());
- iotDbResult.setHeart(map.get("heart"));
- iotDbResult.setBreath(map.get("breath"));
- iotDbResultList.add(iotDbResult);
- }
- }
- }
-}
diff --git a/mobai-event-iotDBDemo/src/main/resources/application.yml b/mobai-event-iotDBDemo/src/main/resources/application.yml
deleted file mode 100644
index e5e12e4..0000000
--- a/mobai-event-iotDBDemo/src/main/resources/application.yml
+++ /dev/null
@@ -1,11 +0,0 @@
-server:
- port: 8085
-spring:
- redis:
- host: 127.0.0.1
- rabbitmq:
- host: 175.24.138.82
- stream:
- username: guest
- password: guest
-
diff --git a/mobai-event-service/pom.xml b/mobai-event-service/pom.xml
index a3e23bb..2a51adc 100644
--- a/mobai-event-service/pom.xml
+++ b/mobai-event-service/pom.xml
@@ -18,15 +18,69 @@
-
- org.springframework.boot
- spring-boot-starter
-
com.mobai
mobai-event-common
1.0.0
+
+
+ org.apache.kafka
+ kafka-clients
+
+
+
+ org.springframework.kafka
+ spring-kafka
+ 2.8.11
+
+
+
+ junit
+ junit
+ test
+
+
+
+ org.apache.iotdb
+ iotdb-session
+ 0.14.0-preview1
+
+
+
+ cn.hutool
+ hutool-all
+ 5.6.3
+
+
+
+ com.alibaba
+ fastjson
+ 1.2.83
+
+
+
+ org.springframework.boot
+ spring-boot-starter-web
+
+
+
+ org.projectlombok
+ lombok
+ true
+
+
+ org.springframework.boot
+ spring-boot-starter-test
+ test
+
+
+ org.junit.vintage
+ junit-vintage-engine
+
+
+
+
diff --git a/mobai-event-client/src/main/java/com/mobai/vehicle/event/client/EventClientApplication.java b/mobai-event-service/src/main/java/com/mobai/EventServiceApplication.java
similarity index 68%
rename from mobai-event-client/src/main/java/com/mobai/vehicle/event/client/EventClientApplication.java
rename to mobai-event-service/src/main/java/com/mobai/EventServiceApplication.java
index 2cee8d8..1cc451b 100644
--- a/mobai-event-client/src/main/java/com/mobai/vehicle/event/client/EventClientApplication.java
+++ b/mobai-event-service/src/main/java/com/mobai/EventServiceApplication.java
@@ -1,4 +1,4 @@
-package com.mobai.vehicle.event.client;
+package com.mobai;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@@ -10,8 +10,8 @@ import org.springframework.boot.autoconfigure.SpringBootApplication;
* @date 2024/6/14 20:40
*/
@SpringBootApplication
-public class EventClientApplication {
+public class EventServiceApplication {
public static void main(String[] args) {
- SpringApplication.run(EventClientApplication.class);
+ SpringApplication.run(EventServiceApplication.class);
}
}
diff --git a/mobai-event-iotDBDemo/src/main/java/com/mobai/config/IotDBSessionConfig.java b/mobai-event-service/src/main/java/com/mobai/iotDB/config/IotDBSessionConfig.java
similarity index 96%
rename from mobai-event-iotDBDemo/src/main/java/com/mobai/config/IotDBSessionConfig.java
rename to mobai-event-service/src/main/java/com/mobai/iotDB/config/IotDBSessionConfig.java
index d0d6bf9..35a4239 100644
--- a/mobai-event-iotDBDemo/src/main/java/com/mobai/config/IotDBSessionConfig.java
+++ b/mobai-event-service/src/main/java/com/mobai/iotDB/config/IotDBSessionConfig.java
@@ -1,4 +1,4 @@
-package com.mobai.config;
+package com.mobai.iotDB.config;
import lombok.extern.log4j.Log4j2;
import org.apache.iotdb.rpc.IoTDBConnectionException;
@@ -34,7 +34,13 @@ public class IotDBSessionConfig {
public Session getSession() throws IoTDBConnectionException, StatementExecutionException {
if (session == null) {
log.info("正在连接iotdb.......");
- session = new Session.Builder().host(LOCAL_HOST).port(6667).username("root").password("root").version(Version.V_0_13).build();
+ session = new Session.Builder()
+ .host(LOCAL_HOST)
+ .port(6667)
+ .username("root")
+ .password("root")
+ .version(Version.V_0_13)
+ .build();
session.open(false);
session.setFetchSize(100);
log.info("iotdb连接成功~");
diff --git a/mobai-event-iotDBDemo/src/main/java/com/mobai/controller/IotDbController.java b/mobai-event-service/src/main/java/com/mobai/iotDB/controller/IotDbController.java
similarity index 58%
rename from mobai-event-iotDBDemo/src/main/java/com/mobai/controller/IotDbController.java
rename to mobai-event-service/src/main/java/com/mobai/iotDB/controller/IotDbController.java
index 2fb3841..a7998f1 100644
--- a/mobai-event-iotDBDemo/src/main/java/com/mobai/controller/IotDbController.java
+++ b/mobai-event-service/src/main/java/com/mobai/iotDB/controller/IotDbController.java
@@ -1,14 +1,16 @@
-package com.mobai.controller;
+package com.mobai.iotDB.controller;
-
-import com.mobai.config.IotDBSessionConfig;
-import com.mobai.domain.IotDbParam;
-import com.mobai.domain.resp.ResponseData;
-import com.mobai.service.IotDbServer;
+import com.mobai.domain.Vehicle;
+import com.mobai.iotDB.config.IotDBSessionConfig;
+import com.mobai.iotDB.domain.resp.ResponseData;
+import com.mobai.iotDB.service.IotDbServer;
+import com.mobai.req.VehicleReq;
import lombok.extern.log4j.Log4j2;
import org.apache.iotdb.rpc.IoTDBConnectionException;
import org.apache.iotdb.rpc.StatementExecutionException;
-import org.springframework.web.bind.annotation.*;
+import org.springframework.web.bind.annotation.PostMapping;
+import org.springframework.web.bind.annotation.RequestBody;
+import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;
import java.rmi.ServerException;
@@ -29,21 +31,21 @@ public class IotDbController {
/**
* 插入数据
- * @param iotDbParam
+ * @param vehicle
*/
@PostMapping("/api/device/insert")
- public ResponseData insert(@RequestBody IotDbParam iotDbParam) throws StatementExecutionException, ServerException, IoTDBConnectionException {
- iotDbServer.insertData(iotDbParam);
+ public ResponseData insert(@RequestBody Vehicle vehicle) throws StatementExecutionException, ServerException, IoTDBConnectionException {
+ iotDbServer.insertData(vehicle);
return ResponseData.success();
}
/**
- * 插入数据
- * @param iotDbParam
+ * 查询数据
+ * @param vehicle
*/
@PostMapping("/api/device/queryData")
- public ResponseData queryDataFromIotDb(@RequestBody IotDbParam iotDbParam) throws Exception {
- return ResponseData.success(iotDbServer.queryDataFromIotDb(iotDbParam));
+ public ResponseData queryDataFromIotDb(@RequestBody VehicleReq vehicle) throws Exception {
+ return ResponseData.success(iotDbServer.queryDataFromIotDb(vehicle));
}
/**
diff --git a/mobai-event-iotDBDemo/src/main/java/com/mobai/domain/IotDbParam.java b/mobai-event-service/src/main/java/com/mobai/iotDB/domain/IotDbParam.java
similarity index 94%
rename from mobai-event-iotDBDemo/src/main/java/com/mobai/domain/IotDbParam.java
rename to mobai-event-service/src/main/java/com/mobai/iotDB/domain/IotDbParam.java
index 9a6b4db..ddd2344 100644
--- a/mobai-event-iotDBDemo/src/main/java/com/mobai/domain/IotDbParam.java
+++ b/mobai-event-service/src/main/java/com/mobai/iotDB/domain/IotDbParam.java
@@ -1,4 +1,4 @@
-package com.mobai.domain;
+package com.mobai.iotDB.domain;
import lombok.Data;
/**
diff --git a/mobai-event-service/src/main/java/com/mobai/iotDB/domain/IotDbResult.java b/mobai-event-service/src/main/java/com/mobai/iotDB/domain/IotDbResult.java
new file mode 100644
index 0000000..9dea676
--- /dev/null
+++ b/mobai-event-service/src/main/java/com/mobai/iotDB/domain/IotDbResult.java
@@ -0,0 +1,23 @@
+package com.mobai.iotDB.domain;
+
+import lombok.Data;
+
+/**
+ * description: 返回结果
+ * date: 2022/8/15 21:56
+ * author: zhouhong
+ */
+@Data
+public class IotDbResult {
+ /***
+ * 时间
+ */
+ private String startTime;
+ /***
+ * 车辆唯一标识
+ */
+ private String VIN;
+
+
+
+}
diff --git a/mobai-event-iotDBDemo/src/main/java/com/mobai/domain/resp/ResponseData.java b/mobai-event-service/src/main/java/com/mobai/iotDB/domain/resp/ResponseData.java
similarity index 96%
rename from mobai-event-iotDBDemo/src/main/java/com/mobai/domain/resp/ResponseData.java
rename to mobai-event-service/src/main/java/com/mobai/iotDB/domain/resp/ResponseData.java
index e0d474f..018c52d 100644
--- a/mobai-event-iotDBDemo/src/main/java/com/mobai/domain/resp/ResponseData.java
+++ b/mobai-event-service/src/main/java/com/mobai/iotDB/domain/resp/ResponseData.java
@@ -1,4 +1,4 @@
-package com.mobai.domain.resp;
+package com.mobai.iotDB.domain.resp;
import lombok.AllArgsConstructor;
import lombok.Data;
diff --git a/mobai-event-service/src/main/java/com/mobai/iotDB/service/IotDbServer.java b/mobai-event-service/src/main/java/com/mobai/iotDB/service/IotDbServer.java
new file mode 100644
index 0000000..169c55a
--- /dev/null
+++ b/mobai-event-service/src/main/java/com/mobai/iotDB/service/IotDbServer.java
@@ -0,0 +1,23 @@
+package com.mobai.iotDB.service;
+
+import com.mobai.domain.Vehicle;
+import com.mobai.iotDB.domain.IotDbParam;
+import com.mobai.req.VehicleReq;
+import org.apache.iotdb.rpc.IoTDBConnectionException;
+import org.apache.iotdb.rpc.StatementExecutionException;
+
+import java.rmi.ServerException;
+
+/**
+ * @ClassName IotDbServer
+ * @Description 描述
+ * @Author Mobai
+ * @Date 2024/6/17 17:20
+ */
+public interface IotDbServer {
+
+ void insertData(Vehicle vehicle) throws StatementExecutionException, ServerException, IoTDBConnectionException;
+
+ Object queryDataFromIotDb(VehicleReq req) throws Exception;
+
+}
diff --git a/mobai-event-service/src/main/java/com/mobai/iotDB/service/impl/IotDbServerImpl.java b/mobai-event-service/src/main/java/com/mobai/iotDB/service/impl/IotDbServerImpl.java
new file mode 100644
index 0000000..0face5a
--- /dev/null
+++ b/mobai-event-service/src/main/java/com/mobai/iotDB/service/impl/IotDbServerImpl.java
@@ -0,0 +1,252 @@
+package com.mobai.iotDB.service.impl;
+
+import com.mobai.domain.Vehicle;
+import com.mobai.iotDB.config.IotDBSessionConfig;
+import com.mobai.iotDB.service.IotDbServer;
+import com.mobai.req.VehicleReq;
+import lombok.extern.log4j.Log4j2;
+import org.apache.iotdb.rpc.IoTDBConnectionException;
+import org.apache.iotdb.rpc.StatementExecutionException;
+import org.apache.iotdb.session.SessionDataSet;
+import org.apache.iotdb.tsfile.read.common.Field;
+import org.apache.iotdb.tsfile.read.common.RowRecord;
+import org.springframework.stereotype.Service;
+
+import javax.annotation.Resource;
+import java.math.BigDecimal;
+import java.rmi.ServerException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * description: iot服务实现类
+ * date: 2022/8/15 9:43
+ * author: zhouhong
+ */
+
+@Log4j2
+@Service
+public class IotDbServerImpl implements IotDbServer {
+
+ @Resource
+ private IotDBSessionConfig iotDBSessionConfig;
+
+
+ @Override
+ public void insertData(Vehicle vehicle) throws StatementExecutionException, ServerException, IoTDBConnectionException {
+ // iotDbParam: 模拟设备上报消息
+ // bizkey: 业务唯一key PK :产品唯一编码 SN:设备唯一编码
+ String deviceId = "root.vin." + vehicle.getVin();
+ // 将设备上报的数据存入数据库(时序数据库)
+ List measurementsList = new ArrayList<>() {{
+ add("startTime");
+ add("longitude");
+ add("latitude");
+ add("speed");
+ add("mileage");
+ add("voltage");
+ add("current");
+ add("resistance");
+ add("gear");
+ add("accelerationPedal");
+ add("brakePedal");
+ add("fuelConsumptionRate");
+ add("motorControllerTemperature");
+ add("motorSpeed");
+ add("motoTorque");
+ add("motorTemperature");
+ add("motorVoltage");
+ add("motorCurrent");
+ add("remainingBattery");
+ add("maximumFeedbackPower");
+ add("maximumDischargePower");
+ add("selfCheckCounter");
+ add("totalBatteryCurrent");
+ add("totalBatteryVoltage");
+ add("singleBatteryMaxVoltage");
+ add("singleBatteryMinVoltage");
+ add("singleBatteryMaxTemperature");
+ add("singleBatteryMinTemperature");
+ add("availableBatteryCapacity");
+ add("vehicleStatus");
+ add("chargingStatus");
+ add("operatingStatus");
+ add("chargingEnergyStorageStatus");
+ add("driveMotorStatus");
+ add("positionStatus");
+ add("easStatus");
+ add("ptcStatus");
+ add("epsStatus");
+ add("absStatus");
+ add("mcuStatus");
+ add("heatingStatus");
+ add("batteryStatus");
+ add("batteryInsulationStatus");
+ add("dcdcStatus");
+ add("chgStatus");
+ }};
+ //车辆具体数据
+ List valuesList = new ArrayList<>() {{
+ add(String.valueOf(vehicle.getStartTime()));
+ add(String.valueOf(vehicle.getLongitude()));
+ add(String.valueOf(vehicle.getLatitude()));
+ add(String.valueOf(vehicle.getSpeed()));
+ add(String.valueOf(vehicle.getMileage()));
+ add(String.valueOf(vehicle.getVoltage()));
+ add(String.valueOf(vehicle.getCurrent()));
+ add(String.valueOf(vehicle.getResistance()));
+ add(String.valueOf(vehicle.getGear()));
+ add(String.valueOf(vehicle.getAccelerationPedal()));
+ add(String.valueOf(vehicle.getBrakePedal()));
+ add(String.valueOf(vehicle.getFuelConsumptionRate()));
+ add(String.valueOf(vehicle.getMotorControllerTemperature()));
+ add(String.valueOf(vehicle.getMotorSpeed()));
+ add(String.valueOf(vehicle.getMotoTorque()));
+ add(String.valueOf(vehicle.getMotorTemperature()));
+ add(String.valueOf(vehicle.getMotorVoltage()));
+ add(String.valueOf(vehicle.getMotorCurrent()));
+ add(String.valueOf(vehicle.getRemainingBattery()));
+ add(String.valueOf(vehicle.getMaximumFeedbackPower()));
+ add(String.valueOf(vehicle.getMaximumDischargePower()));
+ add(String.valueOf(vehicle.getSelfCheckCounter()));
+ add(String.valueOf(vehicle.getTotalBatteryCurrent()));
+ add(String.valueOf(vehicle.getTotalBatteryVoltage()));
+ add(String.valueOf(vehicle.getSingleBatteryMaxVoltage()));
+ add(String.valueOf(vehicle.getSingleBatteryMinVoltage()));
+ add(String.valueOf(vehicle.getSingleBatteryMaxTemperature()));
+ add(String.valueOf(vehicle.getSingleBatteryMinTemperature()));
+ add(String.valueOf(vehicle.getAvailableBatteryCapacity()));
+ add(String.valueOf(vehicle.getVehicleStatus()));
+ add(String.valueOf(vehicle.getChargingStatus()));
+ add(String.valueOf(vehicle.getOperatingStatus()));
+ add(String.valueOf(vehicle.getChargingEnergyStorageStatus()));
+ add(String.valueOf(vehicle.getDriveMotorStatus()));
+ add(String.valueOf(vehicle.getPositionStatus()));
+ add(String.valueOf(vehicle.getEasStatus()));
+ add(String.valueOf(vehicle.getPtcStatus()));
+ add(String.valueOf(vehicle.getEpsStatus()));
+ add(String.valueOf(vehicle.getAbsStatus()));
+ add(String.valueOf(vehicle.getMcuStatus()));
+ add(String.valueOf(vehicle.getHeatingStatus()));
+ add(String.valueOf(vehicle.getBatteryStatus()));
+ add(String.valueOf(vehicle.getBatteryInsulationStatus()));
+ add(String.valueOf(vehicle.getDcdcStatus()));
+ add(String.valueOf(vehicle.getChgStatus()));
+ }};
+
+ iotDBSessionConfig.insertRecord(deviceId, Long.valueOf(vehicle.getStartTime()), measurementsList, valuesList);
+ }
+
+
+ @Override
+ public List queryDataFromIotDb(VehicleReq req) throws Exception {
+ List iotDbResultList = new ArrayList<>();
+
+ if (null != req.getVin()) {
+ String sql = "select * from " + "root.vin." + req.getVin();
+ // 开始时间
+ if (req.getStartTime() != null) {
+ sql += " where startTime >= " + req.getStartTime();
+ }
+ // 结束时间
+ if (req.getStartTime() != null) {
+ sql += " and startTime < " + req.getEndTime();
+ }
+ SessionDataSet sessionDataSet = iotDBSessionConfig.query(sql);
+ List columnNames = sessionDataSet.getColumnNames();
+ List titleList = new ArrayList<>();
+ // 排除Time字段 -- 方便后面后面拼装数据
+ for (int i = 1; i < columnNames.size(); i++) {
+ String[] temp = columnNames.get(i).split("\\.");
+ titleList.add(temp[temp.length - 1]);
+ }
+ // 封装处理数据
+ packagingData(req, iotDbResultList, sessionDataSet, titleList);
+ } else {
+ log.info("VIN不能为空!!");
+ }
+ return iotDbResultList;
+ }
+
+ /**
+ * 封装处理数据
+ *
+ * @param req
+ * @param iotDbResultList
+ * @param sessionDataSet
+ * @param titleList
+ * @throws StatementExecutionException
+ * @throws IoTDBConnectionException
+ */
+ private void packagingData(VehicleReq req, List iotDbResultList, SessionDataSet sessionDataSet, List titleList)
+ throws StatementExecutionException, IoTDBConnectionException {
+ int fetchSize = sessionDataSet.getFetchSize();
+ if (fetchSize > 0) {
+ while (sessionDataSet.hasNext()) {
+ Vehicle vehicle = new Vehicle();
+ RowRecord next = sessionDataSet.next();
+ List fields = next.getFields();
+ vehicle.setStartTime(next.getTimestamp());
+ Map map = new HashMap<>();
+
+ for (int i = 0; i < fields.size(); i++) {
+
+ Field field = fields.get(i);
+ // 这里的需要按照类型获取
+ map.put(titleList.get(i), field.getObjectValue(field.getDataType()).toString());
+ }
+ log.info(map);
+// vehicle.setStartTime(Long.valueOf(map.get("startTime")));
+ vehicle.setVin(map.get("vin"));
+ vehicle.setSpeed(new BigDecimal(map.get("speed")));
+ vehicle.setLatitude(new BigDecimal(map.get("latitude")));
+ vehicle.setLongitude(new BigDecimal(map.get("longitude")));
+ vehicle.setMileage(new BigDecimal(map.get("mileage")));
+ vehicle.setGear(map.get("gear"));
+ vehicle.setVehicleStatus(new BigDecimal(map.get("vehicleStatus")).intValue());
+ vehicle.setChargingStatus(new BigDecimal(map.get("chargingStatus")).intValue());
+ vehicle.setChargingEnergyStorageStatus(new BigDecimal(map.get("chargingEnergyStorageStatus")).intValue());
+ vehicle.setEasStatus(new BigDecimal(map.get("easStatus")).intValue());
+ vehicle.setMotorControllerTemperature(new BigDecimal(map.get("motorControllerTemperature")));
+ vehicle.setTotalBatteryCurrent(new BigDecimal(map.get("totalBatteryCurrent")));
+ vehicle.setSingleBatteryMaxVoltage(new BigDecimal(map.get("singleBatteryMaxVoltage")));
+ vehicle.setOperatingStatus(new BigDecimal(map.get("operatingStatus")).intValue());
+ vehicle.setHeatingStatus(new BigDecimal(map.get("heatingStatus")).intValue());
+ vehicle.setDcdcStatus(new BigDecimal(map.get("dcdcStatus")).intValue());
+ vehicle.setDriveMotorStatus(new BigDecimal(map.get("driveMotorStatus")).intValue());
+ vehicle.setPositionStatus(new BigDecimal(map.get("positionStatus")).intValue());
+ vehicle.setPtcStatus(new BigDecimal(map.get("ptcStatus")).intValue());
+ vehicle.setEpsStatus(new BigDecimal(map.get("epsStatus")).intValue());
+ vehicle.setAbsStatus(new BigDecimal(map.get("absStatus")).intValue());
+ vehicle.setMcuStatus(new BigDecimal(map.get("mcuStatus")).intValue());
+ vehicle.setBatteryInsulationStatus(new BigDecimal(map.get("batteryInsulationStatus")).intValue());
+ vehicle.setBatteryStatus(new BigDecimal(map.get("batteryStatus")).intValue());
+ vehicle.setChgStatus(new BigDecimal(map.get("chgStatus")).intValue());
+ vehicle.setTotalBatteryVoltage(new BigDecimal(map.get("totalBatteryVoltage")));
+ vehicle.setMotorSpeed(new BigDecimal(map.get("motorSpeed")));
+ vehicle.setMotorCurrent(new BigDecimal(map.get("motorCurrent")));
+ vehicle.setMotorVoltage(new BigDecimal(map.get("motorVoltage")));
+ vehicle.setAccelerationPedal(new BigDecimal(map.get("accelerationPedal")));
+ vehicle.setBrakePedal(new BigDecimal(map.get("brakePedal")));
+ vehicle.setSelfCheckCounter(new BigDecimal(map.get("selfCheckCounter")));
+ vehicle.setMotorTemperature(new BigDecimal(map.get("motorTemperature")));
+ vehicle.setMaximumDischargePower(new BigDecimal(map.get("maximumDischargePower")));
+ vehicle.setMaximumFeedbackPower(new BigDecimal(map.get("maximumFeedbackPower")));
+ vehicle.setFuelConsumptionRate(new BigDecimal(map.get("fuelConsumptionRate")));
+ vehicle.setVin(req.getVin());
+ vehicle.setVoltage(new BigDecimal(map.get("voltage")));
+ vehicle.setCurrent(new BigDecimal(map.get("current")));
+ vehicle.setResistance(new BigDecimal(map.get("resistance")));
+ vehicle.setMotoTorque(new BigDecimal(map.get("motoTorque")));
+ vehicle.setRemainingBattery(new BigDecimal(map.get("remainingBattery")));
+ vehicle.setSingleBatteryMinVoltage(new BigDecimal(map.get("singleBatteryMinVoltage")));
+ vehicle.setSingleBatteryMaxTemperature(new BigDecimal(map.get("singleBatteryMaxTemperature")));
+ vehicle.setSingleBatteryMinTemperature(new BigDecimal(map.get("singleBatteryMinTemperature")));
+ vehicle.setAvailableBatteryCapacity(new BigDecimal(map.get("availableBatteryCapacity")));
+ iotDbResultList.add(vehicle);
+ }
+ }
+ }
+}
diff --git a/mobai-event-service/src/main/java/com/mobai/kafka/listener/kafkaConsumerListenerExample.java b/mobai-event-service/src/main/java/com/mobai/kafka/listener/kafkaConsumerListenerExample.java
new file mode 100644
index 0000000..3ee33e3
--- /dev/null
+++ b/mobai-event-service/src/main/java/com/mobai/kafka/listener/kafkaConsumerListenerExample.java
@@ -0,0 +1,51 @@
+package com.mobai.kafka.listener;
+
+import com.alibaba.fastjson2.JSON;
+import com.mobai.domain.Vehicle;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.springframework.context.annotation.Bean;
+import org.springframework.kafka.annotation.KafkaListener;
+import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
+import org.springframework.stereotype.Component;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+import java.util.stream.Collectors;
+
+/**
+ *
+ * @description: kafka 消费者
+ * @copyright: @Copyright (c) 2022
+ * @company: hmblogs
+ * @author: heming
+ * @version: 1.0.0
+ * @createTime: 2024-01-18 8:31
+ */
+@Component
+@Slf4j
+public class kafkaConsumerListenerExample {
+
+ @KafkaListener(topics = {"topic0","topic1"}, groupId = "0")
+ public void consume(ConsumerRecord record) {
+ Vehicle value = JSON.parseObject(record.value(), Vehicle.class);
+ // 进行消息处理逻辑
+ log.info("车辆报文信息 : " + value);
+ }
+
+ //批量消费
+ @KafkaListener( topics = {"topic0","topic1"}, groupId = "Topics")
+ public void onBatchMessage(List> records) {
+ System.out.println(">>> 批量消费一次,recoreds.size()=" + records.size());
+// List collect = records.stream().map(record -> {
+// Vehicle value = JSON.parseObject(record.value(), Vehicle.class);
+// log.info("车辆报文:{}", value);
+// return value;
+// }).toList();
+// log.warn("批量消费的数量为:{},结果为:{}", records.size(), collect);
+ System.out.println(records);
+ }
+
+}
+
diff --git a/mobai-event-client/src/main/java/com/mobai/vehicle/event/client/config/KafkaComponent.java b/mobai-event-service/src/main/java/com/mobai/vehicle/event/client/config/KafkaComponent.java
similarity index 100%
rename from mobai-event-client/src/main/java/com/mobai/vehicle/event/client/config/KafkaComponent.java
rename to mobai-event-service/src/main/java/com/mobai/vehicle/event/client/config/KafkaComponent.java
diff --git a/mobai-event-client/src/main/java/com/mobai/vehicle/event/client/config/MsgComponent.java b/mobai-event-service/src/main/java/com/mobai/vehicle/event/client/config/MsgComponent.java
similarity index 100%
rename from mobai-event-client/src/main/java/com/mobai/vehicle/event/client/config/MsgComponent.java
rename to mobai-event-service/src/main/java/com/mobai/vehicle/event/client/config/MsgComponent.java
diff --git a/mobai-event-client/src/main/java/com/mobai/vehicle/event/client/domain/KafkaConfig.java b/mobai-event-service/src/main/java/com/mobai/vehicle/event/client/domain/KafkaConfig.java
similarity index 100%
rename from mobai-event-client/src/main/java/com/mobai/vehicle/event/client/domain/KafkaConfig.java
rename to mobai-event-service/src/main/java/com/mobai/vehicle/event/client/domain/KafkaConfig.java
diff --git a/mobai-event-service/src/main/resources/application.yml b/mobai-event-service/src/main/resources/application.yml
index c28bbd0..53698b8 100644
--- a/mobai-event-service/src/main/resources/application.yml
+++ b/mobai-event-service/src/main/resources/application.yml
@@ -1,5 +1,8 @@
server:
- port: 8084
+ port: 10001
+kafka:
+ topic: vehicle-event-topic0
+ partition: 0
spring:
redis:
host: 127.0.0.1
@@ -8,4 +11,87 @@ spring:
stream:
username: guest
password: guest
+ # kafka 配置
+ kafka:
+ producer:
+ # Kafka服务器
+ bootstrap-servers: 127.0.0.1:9092
+ # 开启事务,必须在开启了事务的方法中发送,否则报错
+ # transaction-id-prefix: kafkaTx-
+ # 发生错误后,消息重发的次数,开启事务必须设置大于0。
+ retries: 3
+ # acks=0 : 生产者在成功写入消息之前不会等待任何来自服务器的响应。
+ # acks=1 : 只要集群的首领节点收到消息,生产者就会收到一个来自服务器成功响应。
+ # acks=all :只有当所有参与复制的节点全部收到消息时,生产者才会收到一个来自服务器的成功响应。
+ # 开启事务时,必须设置为all
+ acks: all
+ # 当有多个消息需要被发送到同一个分区时,生产者会把它们放在同一个批次里。该参数指定了一个批次可以使用的内存大小,按照字节数计算。
+ batch-size: 16384
+ # 生产者内存缓冲区的大小。
+ buffer-memory: 1024000
+ # 键的序列化方式
+ # key-serializer: org.springframework.kafka.support.serializer.JsonSerializer
+ # # 值的序列化方式(建议使用Json,这种序列化方式可以无需额外配置传输实体类)
+ # value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
+ key-serializer: org.apache.kafka.common.serialization.StringSerializer
+ value-serializer: org.apache.kafka.common.serialization.StringSerializer
+ properties:
+ linger:
+ ms: 2000 # 延迟提交
+ partitioner: #指定分区器
+ class: com.mobai.kafka.CustomPartitioner # 分区器路径
+ consumer:
+ # Kafka服务器
+ bootstrap-servers: 127.0.0.1:9092
+ group-id: firstGroup #默认消费者组ID
+ # 自动提交的时间间隔 在spring boot 2.X 版本中这里采用的是值的类型为Duration 需要符合特定的格式,如1S,1M,2H,5D
+ #auto-commit-interval: 2s
+ # 该属性指定了消费者在读取一个没有偏移量的分区或者偏移量无效的情况下该作何处理:
+ # earliest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费分区的记录
+ # latest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据(在消费者启动之后生成的记录)
+ # none:当各分区都存在已提交的offset时,从提交的offset开始消费;只要有一个分区不存在已提交的offset,则抛出异常
+ auto-offset-reset: earliest
+ # 是否自动提交偏移量,默认值是true,为了避免出现重复数据和数据丢失,可以把它设置为false,然后手动提交偏移量
+ enable-auto-commit: false
+ # 键的反序列化方式
+ #key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
+ # key-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
+ key-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
+ # 值的反序列化方式(建议使用Json,这种序列化方式可以无需额外配置传输实体类)
+ # value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
+ value-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
+ # 配置消费者的 Json 反序列化的可信赖包,反序列化实体类需要
+ properties:
+ spring:
+ json:
+ trusted:
+ packages: "*"
+ spring.deserializer.key.delegate.class: org.apache.kafka.common.serialization.StringDeserializer
+ spring.deserializer.value.delegate.class: org.apache.kafka.common.serialization.StringDeserializer
+ # 这个参数定义了poll方法最多可以拉取多少条消息,默认值为500。如果在拉取消息的时候新消息不足500条,那有多少返回多少;如果超过500条,每次只返回500。
+ # 这个默认值在有些场景下太大,有些场景很难保证能够在5min内处理完500条消息,
+ # 如果消费者无法在5分钟内处理完500条消息的话就会触发reBalance,
+ # 然后这批消息会被分配到另一个消费者中,还是会处理不完,这样这批消息就永远也处理不完。
+ # 要避免出现上述问题,提前评估好处理一条消息最长需要多少时间,然后覆盖默认的max.poll.records参数
+ # 注:需要开启BatchListener批量监听才会生效,如果不开启BatchListener则不会出现reBalance情况
+ max-poll-records: 3
+ properties:
+ # 两次poll之间的最大间隔,默认值为5分钟。如果超过这个间隔会触发reBalance
+ max:
+ poll:
+ interval:
+ ms: 600000
+ # 当broker多久没有收到consumer的心跳请求后就触发reBalance,默认值是10s
+ session:
+ timeout:
+ ms: 10000
+ listener:
+ # 在侦听器容器中运行的线程数,一般设置为 机器数*分区数
+ concurrency: 4
+ # 自动提交关闭,需要设置手动消息确认
+ ack-mode: manual_immediate
+ # 消费监听接口监听的主题不存在时,默认会报错,所以设置为false忽略错误
+ missing-topics-fatal: false
+ # 两次poll之间的最大间隔,默认值为5分钟。如果超过这个间隔会触发reBalance
+ poll-timeout: 600000
diff --git a/pom.xml b/pom.xml
index 3e26f4d..9908984 100644
--- a/pom.xml
+++ b/pom.xml
@@ -10,10 +10,9 @@
pom
mobai-event-common
- mobai-event-service
- mobai-event-remote
mobai-event-client
- mobai-event-iotDBDemo
+ mobai-event-remote
+ mobai-event-service