feat():设置偏移量提交

master
Saisai Liu 2024-06-23 21:31:03 +08:00
parent 86cccdeace
commit c46bb5a47f
5 changed files with 40 additions and 17 deletions

View File

@ -1,7 +1,6 @@
<?xml version="1.0" encoding="UTF-8"?> <?xml version="1.0" encoding="UTF-8"?>
<project version="4"> <project version="4">
<component name="VcsDirectoryMappings"> <component name="VcsDirectoryMappings">
<mapping directory="$PROJECT_DIR$/../../" vcs="Git" />
<mapping directory="$PROJECT_DIR$" vcs="Git" /> <mapping directory="$PROJECT_DIR$" vcs="Git" />
</component> </component>
</project> </project>

View File

@ -155,10 +155,14 @@ public class Vehicle implements Serializable {
/** /**
* SOC * SOC
*/ */
private Integer chargingEnergyStorageStatus; private Integer socStatus;
/** /**
* *
*/ */
private Integer chargingEnergyStorageStatus;
/**
*
*/
private Integer driveMotorStatus; private Integer driveMotorStatus;
/** /**
* *

View File

@ -7,16 +7,22 @@ import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.CreatePartitionsResult; import org.apache.kafka.clients.admin.CreatePartitionsResult;
import org.apache.kafka.clients.admin.ListTopicsResult; import org.apache.kafka.clients.admin.ListTopicsResult;
import org.apache.kafka.clients.admin.NewPartitions; import org.apache.kafka.clients.admin.NewPartitions;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.internals.Heartbeat;
import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.Cluster; import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.Node; import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.header.internals.RecordHeader; import org.apache.kafka.common.header.internals.RecordHeader;
import org.apache.kafka.common.serialization.StringSerializer; import org.apache.kafka.common.serialization.StringSerializer;
import org.assertj.core.util.Maps;
import org.springframework.kafka.annotation.KafkaListener; import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
@ -45,6 +51,8 @@ public class KafkaPCUtils {
properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG,10000);
properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG,10*1000);
// properties.setProperty(ProducerConfig.PARTITIONER_CLASS_CONFIG, CustomizePartitioner.class.getName()); // properties.setProperty(ProducerConfig.PARTITIONER_CLASS_CONFIG, CustomizePartitioner.class.getName());
// 创建生产者 // 创建生产者
KafkaProducer<String, String> producer = new KafkaProducer<>(properties); KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
@ -61,7 +69,6 @@ public class KafkaPCUtils {
for (int i = 0; i < 8; i++) { for (int i = 0; i < 8; i++) {
partitionInfos.add(new PartitionInfo(topic,i,null,null,null)); partitionInfos.add(new PartitionInfo(topic,i,null,null,null));
} }
int partition = new CustomizePartitioner().partition( int partition = new CustomizePartitioner().partition(
topic, topic,
vin, vin,
@ -97,13 +104,20 @@ public class KafkaPCUtils {
//监听消费 //监听消费
@KafkaListener(topics = {"topic0", "topic1"}) @KafkaListener(topics = {"topic0", "topic1"})
public void onNormalMessage1(ConsumerRecord<String, Object> record) { public void onNormalMessage1(ConsumerRecord<String, Object> record, Acknowledgment acknowledgment) {
String value = (String) record.value(); String value = (String) record.value();
JSON.parseObject(value, Vehicle.class); JSON.parseObject(value, Vehicle.class);
System.out.println("简单消费:" + record.topic() + "-" + record.partition() + "=" + System.out.println("简单消费:" + record.topic() + "-" + record.partition() + "=" +
record.value()); record.value());
// Map<TopicPartition, OffsetAndMetadata> commits = Maps.newHashMap(
// new TopicPartition(record.topic(),record.partition()),
// new OffsetAndMetadata(record.offset()));
acknowledgment.acknowledge();
} }
// //批量消费 // //批量消费
// @KafkaListener(id = "consumer2", topics = {"topic1"}, groupId = "sb_group") // @KafkaListener(id = "consumer2", topics = {"topic1"}, groupId = "sb_group")
// public void onBatchMessage(List<ConsumerRecord<String, Object>> records) { // public void onBatchMessage(List<ConsumerRecord<String, Object>> records) {

View File

@ -90,19 +90,20 @@ public class MqttCallBackServiceImpl implements MqttCallback {
Integer vehicleStatus = Integer.valueOf(trainMsg.substring(189, 190)); // 车辆状态 Integer vehicleStatus = Integer.valueOf(trainMsg.substring(189, 190)); // 车辆状态
Integer chargingStatus = Integer.valueOf(trainMsg.substring(190, 191)); // 充电状态 Integer chargingStatus = Integer.valueOf(trainMsg.substring(190, 191)); // 充电状态
Integer operatingStatus = Integer.valueOf(trainMsg.substring(191, 192)); // 运行状态 Integer operatingStatus = Integer.valueOf(trainMsg.substring(191, 192)); // 运行状态
Integer chargingEnergyStorageStatus = Integer.valueOf(trainMsg.substring(192, 193)); // SOC Integer socStatus = Integer.valueOf(trainMsg.substring(192, 193)); // SOC
Integer driveMotorStatus = Integer.valueOf(trainMsg.substring(193, 194)); // 可充电储能装置工作状态 Integer chargingEnergyStorageStatus = Integer.valueOf(trainMsg.substring(193, 194)); // 驱动电机状态
Integer positionStatus = Integer.valueOf(trainMsg.substring(194, 195)); // 定位是否有效 Integer driveMotorStatus = Integer.valueOf(trainMsg.substring(194, 195)); // 可充电储能装置工作状态
Integer easStatus = Integer.valueOf(trainMsg.substring(195, 196)); // EAS(汽车防盗系统)状态 Integer positionStatus = Integer.valueOf(trainMsg.substring(195, 196)); // 定位是否有效
Integer ptcStatus = Integer.valueOf(trainMsg.substring(196, 197)); // PTC(主动安全系统)状态 Integer easStatus = Integer.valueOf(trainMsg.substring(196, 197)); // EAS(汽车防盗系统)状态
Integer epsStatus = Integer.valueOf(trainMsg.substring(197, 198)); // EPS(电动助力系统)状态 Integer ptcStatus = Integer.valueOf(trainMsg.substring(197, 198)); // PTC(主动安全系统)状态
Integer absStatus = Integer.valueOf(trainMsg.substring(198, 199)); // ABS(防抱死)状态 Integer epsStatus = Integer.valueOf(trainMsg.substring(198, 199)); // EPS(电动助力系统)状态
Integer mcuStatus = Integer.valueOf(trainMsg.substring(199, 200)); // MCU(电机/逆变器)状态 Integer absStatus = Integer.valueOf(trainMsg.substring(199, 200)); // ABS(防抱死)状态
Integer heatingStatus = Integer.valueOf(trainMsg.substring(200, 201)); // 动力电池加热状态 Integer mcuStatus = Integer.valueOf(trainMsg.substring(200, 201)); // MCU(电机/逆变器)状态
Integer batteryStatus = Integer.valueOf(trainMsg.substring(201, 202)); // 动力电池当前状态 Integer heatingStatus = Integer.valueOf(trainMsg.substring(201, 202)); // 动力电池加热状态
Integer batteryInsulationStatus = Integer.valueOf(trainMsg.substring(202, 203)); // 动力电池保温状态 Integer batteryStatus = Integer.valueOf(trainMsg.substring(202, 203)); // 动力电池当前状态
Integer dcdcStatus = Integer.valueOf(trainMsg.substring(203, 204)); // DCDC(电力交换系统)状态 Integer batteryInsulationStatus = Integer.valueOf(trainMsg.substring(203, 204)); // 动力电池保温状态
Integer chgStatus = Integer.valueOf(trainMsg.substring(204, 205)); // CHG(充电机)状态 Integer dcdcStatus = Integer.valueOf(trainMsg.substring(204, 205)); // DCDC(电力交换系统)状态
Integer chgStatus = Integer.valueOf(trainMsg.substring(205, 206)); // CHG(充电机)状态
Vehicle vehicle = new Vehicle(vin, Vehicle vehicle = new Vehicle(vin,
startTime, startTime,
@ -137,6 +138,7 @@ public class MqttCallBackServiceImpl implements MqttCallback {
vehicleStatus, vehicleStatus,
chargingStatus, chargingStatus,
operatingStatus, operatingStatus,
socStatus,
chargingEnergyStorageStatus, chargingEnergyStorageStatus,
driveMotorStatus, driveMotorStatus,
positionStatus, positionStatus,

View File

@ -73,6 +73,9 @@ spring:
# 要避免出现上述问题提前评估好处理一条消息最长需要多少时间然后覆盖默认的max.poll.records参数 # 要避免出现上述问题提前评估好处理一条消息最长需要多少时间然后覆盖默认的max.poll.records参数
# 注需要开启BatchListener批量监听才会生效如果不开启BatchListener则不会出现reBalance情况 # 注需要开启BatchListener批量监听才会生效如果不开启BatchListener则不会出现reBalance情况
max-poll-records: 3 max-poll-records: 3
# 心跳时间间隔默认值为10s
heartbeat-interval:
ms: 10000
properties: properties:
# 两次poll之间的最大间隔默认值为5分钟。如果超过这个间隔会触发reBalance # 两次poll之间的最大间隔默认值为5分钟。如果超过这个间隔会触发reBalance
max: max:
@ -94,6 +97,7 @@ spring:
poll-timeout: 600000 poll-timeout: 600000
# forest配置 # forest配置
forest: forest:
backend: okhttp3 # 后端HTTP框架默认为 okhttp3 backend: okhttp3 # 后端HTTP框架默认为 okhttp3