diff --git a/pom.xml b/pom.xml
index 855a155..51b91d4 100644
--- a/pom.xml
+++ b/pom.xml
@@ -94,6 +94,20 @@
8.0.11
runtime
+
+
+
+
+
+
+
+
+ org.springframework.kafka
+ spring-kafka
+ 2.8.0
+
+
+
org.mybatis.spring.boot
diff --git a/src/main/java/com/hyc/controller/SummarizeController.java b/src/main/java/com/hyc/controller/SummarizeController.java
index 8c3c221..1bfef50 100644
--- a/src/main/java/com/hyc/controller/SummarizeController.java
+++ b/src/main/java/com/hyc/controller/SummarizeController.java
@@ -1,15 +1,24 @@
package com.hyc.controller;
+import com.alibaba.fastjson.JSON;
+import com.alibaba.fastjson.JSONObject;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
import com.hyc.domain.ConnectionParameter;
import com.hyc.domain.SummarizeResp;
import com.hyc.result.Result;
import com.hyc.service.SummarizeService;
+import com.hyc.util.CalculateCheckDigit;
+import com.hyc.util.ConversionUtil;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
import java.util.List;
/**
@@ -80,6 +89,41 @@ public class SummarizeController {
System.out.println("topic: " + topic);
System.out.println("Qos: " + message.getQos());
System.out.println("message content: " + new String(message.getPayload()));
+ String resultString = ConversionUtil.hexStringToString(new String(message.getPayload()));
+ log.warn("解析后的字符串是:{}",resultString);
+ log.warn("长度为:{}",resultString.length());
+
+ int count =0;
+ String realString = resultString.substring(1, 207);
+
+ int[] intArr ={17,13,11,10,6,11,6,5,9,1,
+ 2,2,5,6,5,4,6,5,8,6,
+ 6,6,2,5,6,4,4,6,6,6,
+ 1,1,1,1,1,1,1,1,1,1,
+ 1,1,1,1,1,1,1};
+ String[] strArr = {"vin","startTime","longitude","latitude","speed","mileage","voltage","current",
+ "resistance","gear","accelerationPedal","brakePedal",
+ "fuelConsumptionRate","motorControllerTemperature","motorSpeed","motorTorque",
+ "motorTemperature","motorVoltage","motorCurrent","remainingBattery","maximumFeedbackPower","maximumDischargePower",
+ "selfCheckCounter","totalBatteryCurrent","totalBatteryVoltage","singleBatteryMaxVoltage","singleBatteryMinVoltage",
+ "singleBatteryMaxTemperature","singleBatteryMinTemperature","availableBatteryCapacity","vehicleStatus",
+ "chargingStatus","operatingStatus","socStatus","chargingEnergyStorageStatus","driveMotorStatus",
+ "positionStatus","easStatus","ptcStatus","epsStatus","absStatus","mcuStatus",
+ "heatingStatus","batteryStatus","batteryInsulationStatus","dcdcStatus","chgStatus"};
+ LinkedHashMap linkedHashMap = new LinkedHashMap<>();
+ for (int i = 0; i < 47; i++) {
+ String substring = realString.substring(count, count + intArr[i]);
+ linkedHashMap.put(strArr[i],substring);
+ count = count + intArr[i];
+ }
+ log.warn("hashMap:{}",linkedHashMap);
+ ObjectMapper objectMapper = new ObjectMapper();
+ try {
+ String json = objectMapper.writeValueAsString(linkedHashMap);
+ log.error("json格式:{}",json);
+ } catch (JsonProcessingException e) {
+ e.printStackTrace();
+ }
}
@@ -98,4 +142,5 @@ public class SummarizeController {
return true;
}
+
}
diff --git a/src/main/java/com/hyc/kafka/demo/config/KafkaConsumerConfig.java b/src/main/java/com/hyc/kafka/demo/config/KafkaConsumerConfig.java
new file mode 100644
index 0000000..b39ee0d
--- /dev/null
+++ b/src/main/java/com/hyc/kafka/demo/config/KafkaConsumerConfig.java
@@ -0,0 +1,112 @@
+package com.hyc.kafka.demo.config;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.boot.SpringBootConfiguration;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
+import org.springframework.kafka.config.KafkaListenerContainerFactory;
+import org.springframework.kafka.core.ConsumerFactory;
+import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
+import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
+import org.springframework.kafka.listener.ContainerProperties;
+import org.springframework.kafka.support.serializer.JsonDeserializer;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * @author 李帆
+ * @date 2022/10/31 18:05
+ * kafka配置,也可以写在yml,这个文件会覆盖yml
+ */
+@Configuration
+public class KafkaConsumerConfig {
+
+ @Value("${spring.kafka.consumer.bootstrap-servers}")
+ private String bootstrapServers;
+ @Value("${spring.kafka.consumer.group-id}")
+ private String groupId;
+ @Value("${spring.kafka.consumer.enable-auto-commit}")
+ private boolean enableAutoCommit;
+ @Value("${spring.kafka.properties.session.timeout.ms}")
+ private String sessionTimeout;
+ @Value("${spring.kafka.properties.max.poll.interval.ms}")
+ private String maxPollIntervalTime;
+ @Value("${spring.kafka.consumer.max-poll-records}")
+ private String maxPollRecords;
+ @Value("${spring.kafka.consumer.auto-offset-reset}")
+ private String autoOffsetReset;
+ @Value("${spring.kafka.listener.concurrency}")
+ private Integer concurrency;
+ @Value("${spring.kafka.listener.missing-topics-fatal}")
+ private boolean missingTopicsFatal;
+ @Value("${spring.kafka.listener.poll-timeout}")
+ private long pollTimeout;
+
+ @Bean
+ public Map consumerConfigs() {
+
+ Map propsMap = new HashMap<>(16);
+ propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
+ propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
+ //是否自动提交偏移量,默认值是true,为了避免出现重复数据和数据丢失,可以把它设置为false,然后手动提交偏移量
+ propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit);
+ //自动提交的时间间隔,自动提交开启时生效
+ propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "2000");
+ //该属性指定了消费者在读取一个没有偏移量的分区或者偏移量无效的情况下该作何处理:
+ //earliest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费分区的记录
+ //latest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据(在消费者启动之后生成的记录)
+ //none:当各分区都存在已提交的offset时,从提交的offset开始消费;只要有一个分区不存在已提交的offset,则抛出异常
+ propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
+ //两次poll之间的最大间隔,默认值为5分钟。如果超过这个间隔会触发reBalance
+ propsMap.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, maxPollIntervalTime);
+ //这个参数定义了poll方法最多可以拉取多少条消息,默认值为500。如果在拉取消息的时候新消息不足500条,那有多少返回多少;如果超过500条,每次只返回500。
+ //这个默认值在有些场景下太大,有些场景很难保证能够在5min内处理完500条消息,
+ //如果消费者无法在5分钟内处理完500条消息的话就会触发reBalance,
+ //然后这批消息会被分配到另一个消费者中,还是会处理不完,这样这批消息就永远也处理不完。
+ //要避免出现上述问题,提前评估好处理一条消息最长需要多少时间,然后覆盖默认的max.poll.records参数
+ //注:需要开启BatchListener批量监听才会生效,如果不开启BatchListener则不会出现reBalance情况
+ propsMap.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords);
+ //当broker多久没有收到consumer的心跳请求后就触发reBalance,默认值是10s
+ propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimeout);
+ //序列化(建议使用Json,这种序列化方式可以无需额外配置传输实体类)
+ propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
+ propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
+ return propsMap;
+ }
+
+ @Bean
+ public ConsumerFactory