diff --git a/.idea/vcs.xml b/.idea/vcs.xml index f4cd3ff..94a25f7 100644 --- a/.idea/vcs.xml +++ b/.idea/vcs.xml @@ -1,7 +1,6 @@ - \ No newline at end of file diff --git a/src/main/java/com/mobai/domian/Vehicle.java b/src/main/java/com/mobai/domian/Vehicle.java index 8aaa5c4..790449f 100644 --- a/src/main/java/com/mobai/domian/Vehicle.java +++ b/src/main/java/com/mobai/domian/Vehicle.java @@ -155,10 +155,14 @@ public class Vehicle implements Serializable { /** * SOC */ - private Integer chargingEnergyStorageStatus; + private Integer socStatus; /** * 可充电储能装置工作状态 */ + private Integer chargingEnergyStorageStatus; + /** + * 驱动电机状态 + */ private Integer driveMotorStatus; /** * 定位是否有效 diff --git a/src/main/java/com/mobai/kafka/KafkaPCUtils.java b/src/main/java/com/mobai/kafka/KafkaPCUtils.java index cba2fc2..9f53c1f 100644 --- a/src/main/java/com/mobai/kafka/KafkaPCUtils.java +++ b/src/main/java/com/mobai/kafka/KafkaPCUtils.java @@ -7,16 +7,22 @@ import org.apache.kafka.clients.admin.AdminClient; import org.apache.kafka.clients.admin.CreatePartitionsResult; import org.apache.kafka.clients.admin.ListTopicsResult; import org.apache.kafka.clients.admin.NewPartitions; +import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.clients.consumer.internals.Heartbeat; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.Cluster; import org.apache.kafka.common.Node; import org.apache.kafka.common.PartitionInfo; +import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.header.internals.RecordHeader; import org.apache.kafka.common.serialization.StringSerializer; +import org.assertj.core.util.Maps; import org.springframework.kafka.annotation.KafkaListener; +import org.springframework.kafka.support.Acknowledgment; import org.springframework.stereotype.Service; import java.nio.charset.StandardCharsets; @@ -45,6 +51,8 @@ public class KafkaPCUtils { properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); + properties.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG,10000); + properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG,10*1000); // properties.setProperty(ProducerConfig.PARTITIONER_CLASS_CONFIG, CustomizePartitioner.class.getName()); // 创建生产者 KafkaProducer producer = new KafkaProducer<>(properties); @@ -61,7 +69,6 @@ public class KafkaPCUtils { for (int i = 0; i < 8; i++) { partitionInfos.add(new PartitionInfo(topic,i,null,null,null)); } - int partition = new CustomizePartitioner().partition( topic, vin, @@ -97,13 +104,20 @@ public class KafkaPCUtils { //监听消费 @KafkaListener(topics = {"topic0", "topic1"}) - public void onNormalMessage1(ConsumerRecord record) { + public void onNormalMessage1(ConsumerRecord record, Acknowledgment acknowledgment) { String value = (String) record.value(); JSON.parseObject(value, Vehicle.class); + System.out.println("简单消费:" + record.topic() + "-" + record.partition() + "=" + record.value()); +// Map commits = Maps.newHashMap( +// new TopicPartition(record.topic(),record.partition()), +// new OffsetAndMetadata(record.offset())); + acknowledgment.acknowledge(); } + + // //批量消费 // @KafkaListener(id = "consumer2", topics = {"topic1"}, groupId = "sb_group") // public void onBatchMessage(List> records) { diff --git a/src/main/java/com/mobai/service/impl/MqttCallBackServiceImpl.java b/src/main/java/com/mobai/service/impl/MqttCallBackServiceImpl.java index 6cdae06..c605fb1 100644 --- a/src/main/java/com/mobai/service/impl/MqttCallBackServiceImpl.java +++ b/src/main/java/com/mobai/service/impl/MqttCallBackServiceImpl.java @@ -90,19 +90,20 @@ public class MqttCallBackServiceImpl implements MqttCallback { Integer vehicleStatus = Integer.valueOf(trainMsg.substring(189, 190)); // 车辆状态 Integer chargingStatus = Integer.valueOf(trainMsg.substring(190, 191)); // 充电状态 Integer operatingStatus = Integer.valueOf(trainMsg.substring(191, 192)); // 运行状态 - Integer chargingEnergyStorageStatus = Integer.valueOf(trainMsg.substring(192, 193)); // SOC - Integer driveMotorStatus = Integer.valueOf(trainMsg.substring(193, 194)); // 可充电储能装置工作状态 - Integer positionStatus = Integer.valueOf(trainMsg.substring(194, 195)); // 定位是否有效 - Integer easStatus = Integer.valueOf(trainMsg.substring(195, 196)); // EAS(汽车防盗系统)状态 - Integer ptcStatus = Integer.valueOf(trainMsg.substring(196, 197)); // PTC(主动安全系统)状态 - Integer epsStatus = Integer.valueOf(trainMsg.substring(197, 198)); // EPS(电动助力系统)状态 - Integer absStatus = Integer.valueOf(trainMsg.substring(198, 199)); // ABS(防抱死)状态 - Integer mcuStatus = Integer.valueOf(trainMsg.substring(199, 200)); // MCU(电机/逆变器)状态 - Integer heatingStatus = Integer.valueOf(trainMsg.substring(200, 201)); // 动力电池加热状态 - Integer batteryStatus = Integer.valueOf(trainMsg.substring(201, 202)); // 动力电池当前状态 - Integer batteryInsulationStatus = Integer.valueOf(trainMsg.substring(202, 203)); // 动力电池保温状态 - Integer dcdcStatus = Integer.valueOf(trainMsg.substring(203, 204)); // DCDC(电力交换系统)状态 - Integer chgStatus = Integer.valueOf(trainMsg.substring(204, 205)); // CHG(充电机)状态 + Integer socStatus = Integer.valueOf(trainMsg.substring(192, 193)); // SOC + Integer chargingEnergyStorageStatus = Integer.valueOf(trainMsg.substring(193, 194)); // 驱动电机状态 + Integer driveMotorStatus = Integer.valueOf(trainMsg.substring(194, 195)); // 可充电储能装置工作状态 + Integer positionStatus = Integer.valueOf(trainMsg.substring(195, 196)); // 定位是否有效 + Integer easStatus = Integer.valueOf(trainMsg.substring(196, 197)); // EAS(汽车防盗系统)状态 + Integer ptcStatus = Integer.valueOf(trainMsg.substring(197, 198)); // PTC(主动安全系统)状态 + Integer epsStatus = Integer.valueOf(trainMsg.substring(198, 199)); // EPS(电动助力系统)状态 + Integer absStatus = Integer.valueOf(trainMsg.substring(199, 200)); // ABS(防抱死)状态 + Integer mcuStatus = Integer.valueOf(trainMsg.substring(200, 201)); // MCU(电机/逆变器)状态 + Integer heatingStatus = Integer.valueOf(trainMsg.substring(201, 202)); // 动力电池加热状态 + Integer batteryStatus = Integer.valueOf(trainMsg.substring(202, 203)); // 动力电池当前状态 + Integer batteryInsulationStatus = Integer.valueOf(trainMsg.substring(203, 204)); // 动力电池保温状态 + Integer dcdcStatus = Integer.valueOf(trainMsg.substring(204, 205)); // DCDC(电力交换系统)状态 + Integer chgStatus = Integer.valueOf(trainMsg.substring(205, 206)); // CHG(充电机)状态 Vehicle vehicle = new Vehicle(vin, startTime, @@ -137,6 +138,7 @@ public class MqttCallBackServiceImpl implements MqttCallback { vehicleStatus, chargingStatus, operatingStatus, + socStatus, chargingEnergyStorageStatus, driveMotorStatus, positionStatus, diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index aa0fe20..a739fc6 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -73,6 +73,9 @@ spring: # 要避免出现上述问题,提前评估好处理一条消息最长需要多少时间,然后覆盖默认的max.poll.records参数 # 注:需要开启BatchListener批量监听才会生效,如果不开启BatchListener则不会出现reBalance情况 max-poll-records: 3 + # 心跳时间间隔,默认值为10s + heartbeat-interval: + ms: 10000 properties: # 两次poll之间的最大间隔,默认值为5分钟。如果超过这个间隔会触发reBalance max: @@ -94,6 +97,7 @@ spring: poll-timeout: 600000 + # forest配置 forest: backend: okhttp3 # 后端HTTP框架(默认为 okhttp3)