fix(): 修改了报文解析功能,能根据车辆对应的解析规范,对报文进行解析

master
yaoxin 2024-06-26 22:25:26 +08:00
parent 3ef21f1a29
commit 70ce3ee316
18 changed files with 502 additions and 122 deletions

4
.gitignore vendored
View File

@ -2,7 +2,7 @@ HELP.md
target/
!.mvn/wrapper/maven-wrapper.jar
!**/src/main/**/target/
!**/src/test/**/target/
!**/src/vehicleKafka/**/target/
### STS ###
.apt_generated
@ -27,7 +27,7 @@ target/
/.nb-gradle/
build/
!**/src/main/**/build/
!**/src/test/**/build/
!**/src/vehicleKafka/**/build/
### VS Code ###
.vscode/

View File

@ -17,6 +17,11 @@
<java.version>17</java.version>
</properties>
<dependencies>
<!-- SpringBoot Boot Redis -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>

View File

@ -0,0 +1,40 @@
package com.muyu.mqttmessage.common;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.experimental.SuperBuilder;
/**
* @ClassName AnalyzeConfigInfo
* @Description
* @Author Xin.Yao
* @Date 2024/6/26 2:08
*/
@Data
@AllArgsConstructor
@NoArgsConstructor
@SuperBuilder
public class AnalyzeConfigInfo {
/**
*
*/
private Integer startPosition;
/**
*
*/
private Integer endPosition;
/**
* key
*/
private String attributeKey;
/**
*
*/
private String label;
/**
*
*/
private String type;
}

View File

@ -9,9 +9,8 @@ import lombok.Data;
* @Date 2024/6/9 10:56
*/
@Data
public class Test {
public class VehicleKafka {
private Integer partitions;
private String key;
private String data;
private String consumerName;
}

View File

@ -0,0 +1,112 @@
package com.muyu.mqttmessage.common.rest;
import com.muyu.mqttmessage.constants.Constants;
import com.muyu.mqttmessage.constants.HttpStatus;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.io.Serializable;
/**
*
*
* @author muyu
*/
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class Result<T> implements Serializable {
/**
*
*/
public static final int SUCCESS = Constants.SUCCESS;
/**
*
*/
public static final int FAIL = Constants.FAIL;
/**
*
*/
public static final int WARN = HttpStatus.WARN;
private static final long serialVersionUID = 1L;
private int code;
private String msg;
private T data;
public static <T> Result<T> success () {
return restResult(null, SUCCESS, null);
}
public static <T> Result<T> success (T data) {
return restResult(data, SUCCESS, null);
}
public static <T> Result<T> success (T data, String msg) {
return restResult(data, SUCCESS, msg);
}
public static <T> Result<T> error () {
return restResult(null, FAIL, null);
}
public static <T> Result<T> error (String msg) {
return restResult(null, FAIL, msg);
}
public static <T> Result<T> error (T data) {
return restResult(data, FAIL, null);
}
public static <T> Result<T> error (T data, String msg) {
return restResult(data, FAIL, msg);
}
public static <T> Result<T> error (int code, String msg) {
return restResult(null, code, msg);
}
public static <T> Result<T> warn () {
return restResult(null, WARN, null);
}
public static <T> Result<T> warn (String msg) {
return restResult(null, WARN, msg);
}
public static <T> Result<T> warn (T data) {
return restResult(data, WARN, null);
}
public static <T> Result<T> warn (T data, String msg) {
return restResult(data, WARN, msg);
}
public static <T> Result<T> warn (int code, String msg) {
return restResult(null, code, msg);
}
private static <T> Result<T> restResult (T data, int code, String msg) {
return Result.<T>builder()
.code(code)
.data(data)
.msg(msg)
.build();
}
public static <T> Boolean isError (Result<T> ret) {
return !isSuccess(ret);
}
public static <T> Boolean isSuccess (Result<T> ret) {
return Result.SUCCESS == ret.getCode();
}
}

View File

@ -1,15 +0,0 @@
package com.muyu.mqttmessage.config;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Import;
/**
* @ClassName DataAccessClientConfig
* @Description
* @Author Xin.Yao
* @Date 2024/5/9 19:52
*/
@ComponentScan
@Import({MqttMessageRunner.class})
public class MqttMessageConfig {
}

View File

@ -1,22 +0,0 @@
package com.muyu.mqttmessage.config;
import lombok.extern.log4j.Log4j2;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.stereotype.Component;
/**
* @ClassName DataAccessClientRunner
* @Description
* @Author Xin.Yao
* @Date 2024/5/9 19:53
*/
@Log4j2
@Component
public class MqttMessageRunner implements ApplicationRunner {
@Override
public void run(ApplicationArguments args) throws Exception {
}
}

View File

@ -0,0 +1,134 @@
package com.muyu.mqttmessage.constants;
/**
*
*
* @author muyu
*/
public class Constants {
/**
* UTF-8
*/
public static final String UTF8 = "UTF-8";
/**
* GBK
*/
public static final String GBK = "GBK";
/**
* www
*/
public static final String WWW = "www.";
/**
* RMI
*/
public static final String LOOKUP_RMI = "rmi:";
/**
* LDAP
*/
public static final String LOOKUP_LDAP = "ldap:";
/**
* LDAPS
*/
public static final String LOOKUP_LDAPS = "ldaps:";
/**
* http
*/
public static final String HTTP = "http://";
/**
* https
*/
public static final String HTTPS = "https://";
/**
*
*/
public static final Integer SUCCESS = 200;
/**
*
*/
public static final Integer FAIL = 500;
/**
*
*/
public static final String LOGIN_SUCCESS_STATUS = "0";
/**
*
*/
public static final String LOGIN_FAIL_STATUS = "1";
/**
*
*/
public static final String LOGIN_SUCCESS = "Success";
/**
*
*/
public static final String LOGOUT = "Logout";
/**
*
*/
public static final String REGISTER = "Register";
/**
*
*/
public static final String LOGIN_FAIL = "Error";
/**
*
*/
public static final String PAGE_NUM = "pageNum";
/**
*
*/
public static final String PAGE_SIZE = "pageSize";
/**
*
*/
public static final String ORDER_BY_COLUMN = "orderByColumn";
/**
* "desc" "asc".
*/
public static final String IS_ASC = "isAsc";
/**
*
*/
public static final long CAPTCHA_EXPIRATION = 2;
/**
*
*/
public static final String RESOURCE_PREFIX = "/profile";
/**
* json
*/
public static final String[] JSON_WHITELIST_STR = {"org.springframework", "com.muyu"};
/**
* 访
*/
public static final String[] JOB_WHITELIST_STR = {"com.muyu"};
/**
*
*/
public static final String[] JOB_ERROR_STR = {"java.net.URL", "javax.naming.InitialContext", "org.yaml.snakeyaml",
"org.springframework", "org.apache", "com.muyu.common.core.utils.file"};
}

View File

@ -0,0 +1,93 @@
package com.muyu.mqttmessage.constants;
/**
*
*
* @author muyu
*/
public class HttpStatus {
/**
*
*/
public static final int SUCCESS = 200;
/**
*
*/
public static final int CREATED = 201;
/**
*
*/
public static final int ACCEPTED = 202;
/**
*
*/
public static final int NO_CONTENT = 204;
/**
*
*/
public static final int MOVED_PERM = 301;
/**
*
*/
public static final int SEE_OTHER = 303;
/**
*
*/
public static final int NOT_MODIFIED = 304;
/**
*
*/
public static final int BAD_REQUEST = 400;
/**
*
*/
public static final int UNAUTHORIZED = 401;
/**
* 访
*/
public static final int FORBIDDEN = 403;
/**
*
*/
public static final int NOT_FOUND = 404;
/**
* http
*/
public static final int BAD_METHOD = 405;
/**
*
*/
public static final int CONFLICT = 409;
/**
*
*/
public static final int UNSUPPORTED_TYPE = 415;
/**
*
*/
public static final int ERROR = 500;
/**
*
*/
public static final int NOT_IMPLEMENTED = 501;
/**
*
*/
public static final int WARN = 601;
}

View File

@ -0,0 +1,11 @@
package com.muyu.mqttmessage.constants;
/**
* @ClassName RedisConstants
* @Description redis
* @Author Xin.Yao
* @Date 2024/6/26 2:12
*/
public class RedisConstants {
public static final String ANALYZE_CONFIG = "analyze_config";
}

View File

@ -1,17 +1,13 @@
package com.muyu.mqttmessage.consumer;
import com.muyu.mqttmessage.common.Test;
import com.muyu.mqttmessage.common.VehicleKafka;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.stereotype.Component;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
@ -29,7 +25,7 @@ public class KafkaConsumers {
public KafkaConsumer kafkaConsumer(Test test){
public KafkaConsumer kafkaConsumer(VehicleKafka vehicleKafka){
Properties properties = new Properties();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
@ -41,7 +37,7 @@ public class KafkaConsumers {
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
// 订阅主题分区
List<TopicPartition> topicPartitions = new ArrayList<>();
topicPartitions.add(new TopicPartition("testKafka", test.getPartitions()));
topicPartitions.add(new TopicPartition(vehicleKafka.getConsumerName(), vehicleKafka.getPartitions()));
consumer.assign(topicPartitions);
return consumer;

View File

@ -5,10 +5,12 @@ import com.muyu.mqttmessage.common.MqttMessageModel;
import com.muyu.mqttmessage.config.MqttFactory;
import com.muyu.mqttmessage.constants.RabbitMqConstant;
import lombok.extern.log4j.Log4j2;
import org.apache.kafka.clients.admin.NewTopic;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaAdmin;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;
@ -25,11 +27,14 @@ public class RabbitConsumer {
private KafkaTemplate<String,Object> kafkaTemplate;
@Autowired
private MqttFactory mqttFactory;
@Autowired
private KafkaAdmin kafkaAdmin;
@RabbitListener(queuesToDeclare = {@Queue(RabbitMqConstant.MQTT_MESSAGE_QUEUE)})
public void monitorServer(String msg){
log.info("监听到的消息:{}",msg);
MqttMessageModel mqttMessageModel = JSON.parseObject(msg, MqttMessageModel.class);
MqttClient mqttClient = mqttFactory.createMqttClient(mqttMessageModel);
mqttFactory.createMqttClient(mqttMessageModel);
kafkaAdmin.createOrModifyTopics(new NewTopic(mqttMessageModel.getBroker(),8,(short) 1));
log.info("{}服务器监听连接成功",mqttMessageModel.getTopic());
}

View File

@ -1,8 +1,7 @@
package com.muyu.mqttmessage.controller;
import com.muyu.mqttmessage.common.Test;
import com.muyu.mqttmessage.common.VehicleKafka;
import com.muyu.mqttmessage.service.MqttKafkaService;
import com.muyu.mqttmessage.service.impl.MqttCallBackServiceImpl;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;
@ -18,18 +17,19 @@ public class TestController {
@Autowired
private MqttKafkaService mqttKafkaService;
@GetMapping("/Test")
public void Test(@RequestBody Test test) {
mqttKafkaService.test(test);
}
@GetMapping("/Consumer")
public void consumer(@RequestBody Test test) {
mqttKafkaService.consumer(test);
public void consumer(@RequestBody VehicleKafka vehicleKafka) {
mqttKafkaService.consumer(vehicleKafka);
}
@GetMapping("/CloseConsumer")
public void closeConsumer(@RequestParam("consumerName") String consumerName) {
mqttKafkaService.closeConsumer(consumerName);
}
@GetMapping("/Test")
public void test() {
mqttKafkaService.test();
}
}

View File

@ -23,58 +23,72 @@ public class VehicleData {
* VIN
*/
private String vin;
/**
* 线
*/
private String drivingRoute;
/**
*
*/
private String longitude;
/**
*
*/
private String latitude;
/**
*
*/
private String speed;
/**
*
*/
private BigDecimal mileage;
/**
*
*/
private String voltage;
/**
*
*/
private String current;
/**
*
*/
private String resistance;
/**
*
*/
private String gear = "P";
/**
*
*/
private String accelerationPedal;
/**
*
*/
private String brakePedal;
/**
*
*/
private String fuelConsumptionRate;
/**
*
*/
private String motorControllerTemperature;
/**
*
*/
@ -244,4 +258,5 @@ public class VehicleData {
* CHG()
*/
private int chgStatus = 1;
}

View File

@ -1,7 +1,6 @@
package com.muyu.mqttmessage.service;
import com.muyu.mqttmessage.common.Test;
import org.springframework.stereotype.Component;
import com.muyu.mqttmessage.common.VehicleKafka;
/**
* @ClassName MqttKafkaService
@ -10,9 +9,10 @@ import org.springframework.stereotype.Component;
* @Date 2024/6/9 11:05
*/
public interface MqttKafkaService {
void test(Test test);
void consumer(Test test);
void consumer(VehicleKafka vehicleKafka);
void closeConsumer(String consumerName);
void test();
}

View File

@ -1,19 +1,24 @@
package com.muyu.mqttmessage.service.impl;
import com.alibaba.fastjson2.JSON;
import com.muyu.mqttmessage.common.Test;
import com.muyu.mqttmessage.common.AnalyzeConfigInfo;
import com.muyu.mqttmessage.common.VehicleKafka;
import com.muyu.mqttmessage.constants.RedisConstants;
import com.muyu.mqttmessage.domain.VehicleData;
import com.muyu.mqttmessage.utils.ConversionUtil;
import kong.unirest.json.JSONObject;
import lombok.extern.log4j.Log4j2;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
import org.springframework.stereotype.Service;
import java.math.BigDecimal;
import java.util.ArrayList;
import java.util.List;
/**
* @ClassName MqttCallBackConfig
@ -24,11 +29,14 @@ import java.math.BigDecimal;
@Component
@Log4j2
public class MqttCallBackServiceImpl implements MqttCallback {
private KafkaTemplate<String,Object> kafkaTemplate;
public MqttCallBackServiceImpl(KafkaTemplate<String, Object> kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}
@Autowired
private RedisTemplate<String,String> redisTemplate;
// @Autowired
@ -42,14 +50,10 @@ public class MqttCallBackServiceImpl implements MqttCallback {
@Override
public void messageArrived(String topic, MqttMessage message) {
try {
VehicleData vehicleData = getVehicleData(ConversionUtil.hexStringToString(new String(message.getPayload())));
String jsonString = JSON.toJSONString(vehicleData);
log.info("转化为对象:{}",jsonString);
Test test = new Test();
test.setPartitions(1);
test.setKey("123");
test.setData(jsonString);
kafkaTemplate.send("testKafka",test.getPartitions(),test.getKey(),test.getData());
JSONObject jsonObject = getJsonObject(ConversionUtil.hexStringToString(new String(message.getPayload())));
Object o = redisTemplate.opsForHash().get("vehicleKafka", jsonObject.get("vin"));
VehicleKafka vehicleKafka = JSON.parseObject(o.toString(), VehicleKafka.class);
kafkaTemplate.send(vehicleKafka.getConsumerName(), vehicleKafka.getPartitions(), vehicleKafka.getKey(),jsonObject.toString());
}catch (Exception e){
e.printStackTrace();
}
@ -61,6 +65,21 @@ public class MqttCallBackServiceImpl implements MqttCallback {
System.out.println("deliveryComplete---------" + token.isComplete());
}
public JSONObject getJsonObject(String message) {
message = message.substring(1,message.length()-2);
StringBuffer stringBuffer = new StringBuffer();
Object o = redisTemplate.opsForHash().get(RedisConstants.ANALYZE_CONFIG,message.substring(0,17));
List<AnalyzeConfigInfo> analyzeConfigInfos = new ArrayList<>();
analyzeConfigInfos=JSON.parseObject(o.toString(), ArrayList.class).stream().map(obj -> JSON.parseObject(obj.toString(), AnalyzeConfigInfo.class)).toList();
String finalMessage = message;
analyzeConfigInfos.forEach(analyzeConfigInfo -> {
stringBuffer.append(",\""+analyzeConfigInfo.getAttributeKey()+"\":\""+ removeSuperfluousDigit(finalMessage.substring(analyzeConfigInfo.getStartPosition()-1,analyzeConfigInfo.getEndPosition()))+"\"");
});
String jsonString = "{"+stringBuffer.substring(1)+"}";
log.info("解析后的数据:{}",jsonString);
return new JSONObject(jsonString);
}
public VehicleData getVehicleData(String message) {
message = message.substring(1,message.length()-2);
return VehicleData.builder()
@ -161,7 +180,6 @@ public class MqttCallBackServiceImpl implements MqttCallback {
.build();
}
public String removeSuperfluousDigit(String str){
if(str.length()>1){
if(str.charAt(0)=='0'){

View File

@ -1,18 +1,25 @@
package com.muyu.mqttmessage.service.impl;
import com.muyu.mqttmessage.common.Test;
import com.alibaba.fastjson2.JSON;
import com.muyu.mqttmessage.common.AnalyzeConfigInfo;
import com.muyu.mqttmessage.common.VehicleKafka;
import com.muyu.mqttmessage.constants.RedisConstants;
import com.muyu.mqttmessage.consumer.KafkaConsumers;
import com.muyu.mqttmessage.service.MqttKafkaService;
import com.muyu.mqttmessage.utils.ConversionUtil;
import lombok.extern.log4j.Log4j2;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
import java.time.Duration;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
@ -30,20 +37,19 @@ public class MqttKafkaServiceImpl implements MqttKafkaService {
@Autowired
private KafkaConsumers kafkaConsumers;
@Override
public void test(Test test) {
kafkaTemplate.send("testKafka",test.getPartitions(),test.getKey(),test.getData());
}
@Autowired
private RedisTemplate<String,String> redisTemplate;
@Override
public void consumer(Test test) {
KafkaConsumer consumer = kafkaConsumers.kafkaConsumer(test);
consumerMap.put(test.getConsumerName(),true);
while (consumerMap.containsKey(test.getConsumerName())){
public void consumer(VehicleKafka vehicleKafka) {
KafkaConsumer consumer = kafkaConsumers.kafkaConsumer(vehicleKafka);
consumerMap.put(vehicleKafka.getConsumerName(),true);
while (consumerMap.containsKey(vehicleKafka.getConsumerName())){
// 拉取消息
ConsumerRecords<String, String> msg = consumer.poll(Duration.ofSeconds(1));
for (ConsumerRecord<String, String> consumerRecord : msg) {
log.info("{}监听到的消息内容: {}",test.getConsumerName(),consumerRecord.value());
log.info("{}监听到的消息内容: {}", vehicleKafka.getConsumerName(),consumerRecord.value());
}
}
consumer.close();
@ -53,4 +59,22 @@ public class MqttKafkaServiceImpl implements MqttKafkaService {
public void closeConsumer(String consumerName) {
consumerMap.remove(consumerName);
}
@Override
public void test() {
String message = new String();
message = "7E 56 49 4e 31 32 33 34 35 36 37 38 39 44 49 4a 45 34 31 37 31 37 35 37 34 33 39 33 35 33 33 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 32 33 2e 36 39 36 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 50 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 34 35 30 30 30 2e 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 31 31 31 31 31 31 31 31 31 31 31 31 31 31 31 31 31 14 7E";
String str = ConversionUtil.hexStringToString(message);
message = str.substring(1,str.length()-2);
StringBuffer stringBuffer = new StringBuffer();
Object o = redisTemplate.opsForHash().get(RedisConstants.ANALYZE_CONFIG, "vin1");
List<AnalyzeConfigInfo> analyzeConfigInfos = new ArrayList<>();
analyzeConfigInfos=JSON.parseObject(o.toString(), ArrayList.class).stream().map(obj -> JSON.parseObject(obj.toString(), AnalyzeConfigInfo.class)).toList();
String finalMessage = message;
analyzeConfigInfos.forEach(analyzeConfigInfo -> {
stringBuffer.append(",\""+analyzeConfigInfo.getAttributeKey()+"\":\""+ finalMessage.substring(analyzeConfigInfo.getStartPosition()-1,analyzeConfigInfo.getEndPosition())+"\"");
});
String jsonString = "{"+stringBuffer.substring(1)+"}";
log.info("解析后的数据:{}",jsonString);
}
}

View File

@ -1,4 +1,8 @@
spring:
redis:
host: 47.99.219.99
port: 6379
password: yx@123
application:
name: mqtt-message
jackson:
@ -9,51 +13,12 @@ spring:
password: guest
virtualHost: /
port: 5672
host: 43.142.44.217
host: 47.99.219.99
listener:
simple:
prefetch: 1 # 每次只能获取一条,处理完成才能获取下一条
publisher-confirm-type: correlated #确认消息已发送到交换机(Exchange)
publisher-returns: true #确认消息已发送到队列(Queue)
# kafka:
# bootstrap-servers: 47.98.170.220:9092 #这个是kafka的地址,对应你server.properties中配置的
# producer:
# batch-size: 16384 #批量大小
# acks: -1 #应答级别:多少个分区副本备份完成时向生产者发送ack确认(可选0、1、all/-1)
# retries: 10 # 消息发送重试次数
# #transaction-id-prefix: transaction
# buffer-memory: 33554432
# key-serializer: org.apache.kafka.common.serialization.StringSerializer
# value-serializer: org.apache.kafka.common.serialization.StringSerializer
# properties:
# partitioner:
# class: com.muyu.mqttmessage.config.kafkaconfig.CustomizePartitioner
# linger:
# ms: 2000 #提交延迟
# #partitioner: #指定分区器
# #class: pers.zhang.config.CustomerPartitionHandler
# consumer:
# group-id: testGroup #默认的消费组ID
# enable-auto-commit: true #是否自动提交offset
# auto-commit-interval: 2000 #提交offset延时
# # 当kafka中没有初始offset或offset超出范围时将自动重置offset
# # earliest:重置为分区中最小的offset;
# # latest:重置为分区中最新的offset(消费分区中新产生的数据);
# # none:只要有一个分区不存在已提交的offset,就抛出异常;
# auto-offset-reset: latest
# max-poll-records: 500 #单次拉取消息的最大条数
# key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
# value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
# properties:
# session:
# timeout:
# ms: 120000 # 消费会话超时时间(超过这个时间 consumer 没有发送心跳,就会触发 rebalance 操作)
# request:
# timeout:
# ms: 18000 # 消费请求的超时时间
# listener:
# missing-topics-fatal: false # consumer listener topics 不存在时,启动项目就会报错
# # type: batch
kafka:
#config/consumer.properties配置的bootstrap.servers