From c46bb5a47f554346472084ff85a46c58d2a4ad1f Mon Sep 17 00:00:00 2001
From: Saisai Liu <1374434128@qq.com>
Date: Sun, 23 Jun 2024 21:31:03 +0800
Subject: [PATCH] =?UTF-8?q?feat():=E8=AE=BE=E7=BD=AE=E5=81=8F=E7=A7=BB?=
=?UTF-8?q?=E9=87=8F=E6=8F=90=E4=BA=A4?=
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
---
.idea/vcs.xml | 1 -
src/main/java/com/mobai/domian/Vehicle.java | 6 +++-
.../java/com/mobai/kafka/KafkaPCUtils.java | 18 ++++++++++--
.../service/impl/MqttCallBackServiceImpl.java | 28 ++++++++++---------
src/main/resources/application.yml | 4 +++
5 files changed, 40 insertions(+), 17 deletions(-)
diff --git a/.idea/vcs.xml b/.idea/vcs.xml
index f4cd3ff..94a25f7 100644
--- a/.idea/vcs.xml
+++ b/.idea/vcs.xml
@@ -1,7 +1,6 @@
-
\ No newline at end of file
diff --git a/src/main/java/com/mobai/domian/Vehicle.java b/src/main/java/com/mobai/domian/Vehicle.java
index 8aaa5c4..790449f 100644
--- a/src/main/java/com/mobai/domian/Vehicle.java
+++ b/src/main/java/com/mobai/domian/Vehicle.java
@@ -155,10 +155,14 @@ public class Vehicle implements Serializable {
/**
* SOC
*/
- private Integer chargingEnergyStorageStatus;
+ private Integer socStatus;
/**
* 可充电储能装置工作状态
*/
+ private Integer chargingEnergyStorageStatus;
+ /**
+ * 驱动电机状态
+ */
private Integer driveMotorStatus;
/**
* 定位是否有效
diff --git a/src/main/java/com/mobai/kafka/KafkaPCUtils.java b/src/main/java/com/mobai/kafka/KafkaPCUtils.java
index cba2fc2..9f53c1f 100644
--- a/src/main/java/com/mobai/kafka/KafkaPCUtils.java
+++ b/src/main/java/com/mobai/kafka/KafkaPCUtils.java
@@ -7,16 +7,22 @@ import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.CreatePartitionsResult;
import org.apache.kafka.clients.admin.ListTopicsResult;
import org.apache.kafka.clients.admin.NewPartitions;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.clients.consumer.internals.Heartbeat;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.apache.kafka.common.serialization.StringSerializer;
+import org.assertj.core.util.Maps;
import org.springframework.kafka.annotation.KafkaListener;
+import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Service;
import java.nio.charset.StandardCharsets;
@@ -45,6 +51,8 @@ public class KafkaPCUtils {
properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
+ properties.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG,10000);
+ properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG,10*1000);
// properties.setProperty(ProducerConfig.PARTITIONER_CLASS_CONFIG, CustomizePartitioner.class.getName());
// 创建生产者
KafkaProducer producer = new KafkaProducer<>(properties);
@@ -61,7 +69,6 @@ public class KafkaPCUtils {
for (int i = 0; i < 8; i++) {
partitionInfos.add(new PartitionInfo(topic,i,null,null,null));
}
-
int partition = new CustomizePartitioner().partition(
topic,
vin,
@@ -97,13 +104,20 @@ public class KafkaPCUtils {
//监听消费
@KafkaListener(topics = {"topic0", "topic1"})
- public void onNormalMessage1(ConsumerRecord record) {
+ public void onNormalMessage1(ConsumerRecord record, Acknowledgment acknowledgment) {
String value = (String) record.value();
JSON.parseObject(value, Vehicle.class);
+
System.out.println("简单消费:" + record.topic() + "-" + record.partition() + "=" +
record.value());
+// Map commits = Maps.newHashMap(
+// new TopicPartition(record.topic(),record.partition()),
+// new OffsetAndMetadata(record.offset()));
+ acknowledgment.acknowledge();
}
+
+
// //批量消费
// @KafkaListener(id = "consumer2", topics = {"topic1"}, groupId = "sb_group")
// public void onBatchMessage(List> records) {
diff --git a/src/main/java/com/mobai/service/impl/MqttCallBackServiceImpl.java b/src/main/java/com/mobai/service/impl/MqttCallBackServiceImpl.java
index 6cdae06..c605fb1 100644
--- a/src/main/java/com/mobai/service/impl/MqttCallBackServiceImpl.java
+++ b/src/main/java/com/mobai/service/impl/MqttCallBackServiceImpl.java
@@ -90,19 +90,20 @@ public class MqttCallBackServiceImpl implements MqttCallback {
Integer vehicleStatus = Integer.valueOf(trainMsg.substring(189, 190)); // 车辆状态
Integer chargingStatus = Integer.valueOf(trainMsg.substring(190, 191)); // 充电状态
Integer operatingStatus = Integer.valueOf(trainMsg.substring(191, 192)); // 运行状态
- Integer chargingEnergyStorageStatus = Integer.valueOf(trainMsg.substring(192, 193)); // SOC
- Integer driveMotorStatus = Integer.valueOf(trainMsg.substring(193, 194)); // 可充电储能装置工作状态
- Integer positionStatus = Integer.valueOf(trainMsg.substring(194, 195)); // 定位是否有效
- Integer easStatus = Integer.valueOf(trainMsg.substring(195, 196)); // EAS(汽车防盗系统)状态
- Integer ptcStatus = Integer.valueOf(trainMsg.substring(196, 197)); // PTC(主动安全系统)状态
- Integer epsStatus = Integer.valueOf(trainMsg.substring(197, 198)); // EPS(电动助力系统)状态
- Integer absStatus = Integer.valueOf(trainMsg.substring(198, 199)); // ABS(防抱死)状态
- Integer mcuStatus = Integer.valueOf(trainMsg.substring(199, 200)); // MCU(电机/逆变器)状态
- Integer heatingStatus = Integer.valueOf(trainMsg.substring(200, 201)); // 动力电池加热状态
- Integer batteryStatus = Integer.valueOf(trainMsg.substring(201, 202)); // 动力电池当前状态
- Integer batteryInsulationStatus = Integer.valueOf(trainMsg.substring(202, 203)); // 动力电池保温状态
- Integer dcdcStatus = Integer.valueOf(trainMsg.substring(203, 204)); // DCDC(电力交换系统)状态
- Integer chgStatus = Integer.valueOf(trainMsg.substring(204, 205)); // CHG(充电机)状态
+ Integer socStatus = Integer.valueOf(trainMsg.substring(192, 193)); // SOC
+ Integer chargingEnergyStorageStatus = Integer.valueOf(trainMsg.substring(193, 194)); // 驱动电机状态
+ Integer driveMotorStatus = Integer.valueOf(trainMsg.substring(194, 195)); // 可充电储能装置工作状态
+ Integer positionStatus = Integer.valueOf(trainMsg.substring(195, 196)); // 定位是否有效
+ Integer easStatus = Integer.valueOf(trainMsg.substring(196, 197)); // EAS(汽车防盗系统)状态
+ Integer ptcStatus = Integer.valueOf(trainMsg.substring(197, 198)); // PTC(主动安全系统)状态
+ Integer epsStatus = Integer.valueOf(trainMsg.substring(198, 199)); // EPS(电动助力系统)状态
+ Integer absStatus = Integer.valueOf(trainMsg.substring(199, 200)); // ABS(防抱死)状态
+ Integer mcuStatus = Integer.valueOf(trainMsg.substring(200, 201)); // MCU(电机/逆变器)状态
+ Integer heatingStatus = Integer.valueOf(trainMsg.substring(201, 202)); // 动力电池加热状态
+ Integer batteryStatus = Integer.valueOf(trainMsg.substring(202, 203)); // 动力电池当前状态
+ Integer batteryInsulationStatus = Integer.valueOf(trainMsg.substring(203, 204)); // 动力电池保温状态
+ Integer dcdcStatus = Integer.valueOf(trainMsg.substring(204, 205)); // DCDC(电力交换系统)状态
+ Integer chgStatus = Integer.valueOf(trainMsg.substring(205, 206)); // CHG(充电机)状态
Vehicle vehicle = new Vehicle(vin,
startTime,
@@ -137,6 +138,7 @@ public class MqttCallBackServiceImpl implements MqttCallback {
vehicleStatus,
chargingStatus,
operatingStatus,
+ socStatus,
chargingEnergyStorageStatus,
driveMotorStatus,
positionStatus,
diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml
index aa0fe20..a739fc6 100644
--- a/src/main/resources/application.yml
+++ b/src/main/resources/application.yml
@@ -73,6 +73,9 @@ spring:
# 要避免出现上述问题,提前评估好处理一条消息最长需要多少时间,然后覆盖默认的max.poll.records参数
# 注:需要开启BatchListener批量监听才会生效,如果不开启BatchListener则不会出现reBalance情况
max-poll-records: 3
+ # 心跳时间间隔,默认值为10s
+ heartbeat-interval:
+ ms: 10000
properties:
# 两次poll之间的最大间隔,默认值为5分钟。如果超过这个间隔会触发reBalance
max:
@@ -94,6 +97,7 @@ spring:
poll-timeout: 600000
+
# forest配置
forest:
backend: okhttp3 # 后端HTTP框架(默认为 okhttp3)