diff --git a/src/main/java/com/mobai/cofig/KafkaConsumerConfig.java b/src/main/java/com/mobai/cofig/KafkaConsumerConfig.java index ba7d1ec..5312a2a 100644 --- a/src/main/java/com/mobai/cofig/KafkaConsumerConfig.java +++ b/src/main/java/com/mobai/cofig/KafkaConsumerConfig.java @@ -1,37 +1,37 @@ -package com.mobai.cofig; - -import org.apache.kafka.clients.consumer.ConsumerConfig; -import org.apache.kafka.common.serialization.StringDeserializer; -import org.springframework.context.annotation.Bean; -import org.springframework.context.annotation.Configuration; -import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; -import org.springframework.kafka.core.DefaultKafkaConsumerFactory; -import org.springframework.kafka.listener.SeekToCurrentErrorHandler; -import org.springframework.kafka.support.serializer.ErrorHandlingDeserializer; - - -import java.util.HashMap; -import java.util.Map; - -@Configuration -public class KafkaConsumerConfig { - - @Bean - public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory() { - ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>(); - factory.setConsumerFactory(consumerFactory()); - factory.setErrorHandler(new SeekToCurrentErrorHandler()); - return factory; - } - - @Bean - public DefaultKafkaConsumerFactory consumerFactory() { - Map props = new HashMap<>(); - props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "your-bootstrap-server"); - props.put(ConsumerConfig.GROUP_ID_CONFIG, "your-group-id"); - props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class); - props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class); - - return new DefaultKafkaConsumerFactory<>(props, new ErrorHandlingDeserializer<>(new StringDeserializer()), new ErrorHandlingDeserializer<>(new StringDeserializer())); - } -} +//package com.mobai.cofig; +// +//import org.apache.kafka.clients.consumer.ConsumerConfig; +//import org.apache.kafka.common.serialization.StringDeserializer; +//import org.springframework.context.annotation.Bean; +//import org.springframework.context.annotation.Configuration; +//import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; +//import org.springframework.kafka.core.DefaultKafkaConsumerFactory; +//import org.springframework.kafka.listener.SeekToCurrentErrorHandler; +//import org.springframework.kafka.support.serializer.ErrorHandlingDeserializer; +// +// +//import java.util.HashMap; +//import java.util.Map; +// +//@Configuration +//public class KafkaConsumerConfig { +// +// @Bean +// public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory() { +// ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>(); +// factory.setConsumerFactory(consumerFactory()); +// factory.setErrorHandler(new SeekToCurrentErrorHandler()); +// return factory; +// } +// +// @Bean +// public DefaultKafkaConsumerFactory consumerFactory() { +// Map props = new HashMap<>(); +// props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "your-bootstrap-server"); +// props.put(ConsumerConfig.GROUP_ID_CONFIG, "your-group-id"); +// props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class); +// props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class); +// +// return new DefaultKafkaConsumerFactory<>(props, new ErrorHandlingDeserializer<>(new StringDeserializer()), new ErrorHandlingDeserializer<>(new StringDeserializer())); +// } +//} diff --git a/src/main/java/com/mobai/domian/Vehicle.java b/src/main/java/com/mobai/domian/Vehicle.java index bdde97b..b294c36 100644 --- a/src/main/java/com/mobai/domian/Vehicle.java +++ b/src/main/java/com/mobai/domian/Vehicle.java @@ -5,6 +5,7 @@ import lombok.Builder; import lombok.Data; import lombok.NoArgsConstructor; +import java.io.Serializable; import java.math.BigDecimal; /** @@ -18,7 +19,7 @@ import java.math.BigDecimal; @Builder @NoArgsConstructor @AllArgsConstructor -public class Vehicle { +public class Vehicle implements Serializable { /** * 车辆vin */ diff --git a/src/main/java/com/mobai/kafka/KafkaConsumer.java b/src/main/java/com/mobai/kafka/KafkaConsumer.java index 17b8e91..740c200 100644 --- a/src/main/java/com/mobai/kafka/KafkaConsumer.java +++ b/src/main/java/com/mobai/kafka/KafkaConsumer.java @@ -13,20 +13,20 @@ import java.util.List; public class KafkaConsumer { //监听消费 - @KafkaListener(topics = {"sb_topic"}) + @KafkaListener(topics = {"topic0"}) public void onNormalMessage(ConsumerRecord record) { System.out.println("简单消费:" + record.topic() + "-" + record.partition() + "=" + record.value()); } - - //批量消费 - @KafkaListener(id = "consumer2", topics = {"sb_topic","mobai-mq"}, groupId = "sb_group") - public void onBatchMessage(List> records) { - System.out.println(">>> 批量消费一次,recoreds.size()=" + records.size()); - for (ConsumerRecord record : records) { - System.out.println(record.value()); - } - } +// +// //批量消费 +// @KafkaListener(id = "consumer2", topics = {"topic0","topic1"}, groupId = "sb_group") +// public void onBatchMessage(List> records) { +// System.out.println(">>> 批量消费一次,recoreds.size()=" + records.size()); +// for (ConsumerRecord record : records) { +// System.out.println(record.value()); +// } +// } } diff --git a/src/main/java/com/mobai/kafka/KafkaPCUtils.java b/src/main/java/com/mobai/kafka/KafkaPCUtils.java index 050017e..1af26c9 100644 --- a/src/main/java/com/mobai/kafka/KafkaPCUtils.java +++ b/src/main/java/com/mobai/kafka/KafkaPCUtils.java @@ -1,5 +1,6 @@ package com.mobai.kafka; +import com.alibaba.fastjson.JSON; import com.mobai.domian.Vehicle; import lombok.extern.log4j.Log4j2; import org.apache.kafka.clients.consumer.ConsumerRecord; @@ -41,14 +42,15 @@ public class KafkaPCUtils { properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); // 创建生产者 - KafkaProducer producer = new KafkaProducer<>(properties); + KafkaProducer producer = new KafkaProducer<>(properties); // 创建消息 // ProducerRecord record = new ProducerRecord<>(topicName,vehicle.getVin(), vehicle.toString()); - ProducerRecord record = new ProducerRecord<>("my_topic", "key", vehicle); + ProducerRecord record = new ProducerRecord<>(topic, vehicle.getVin(), JSON.toJSONString(vehicle)); record.headers().add(new RecordHeader("type", "String".getBytes(StandardCharsets.UTF_8))); - producer.send(record); + // 只发送无回执 +// producer.send(record); // record.partition(); - // 发送消息 + // 发送消息 有回执 producer.send(record,(matadata,exception)->{ if (exception == null) { log.info("消息发送成功,topic:[{}]",topic); @@ -58,7 +60,6 @@ public class KafkaPCUtils { }); // 关闭生产者 producer.close(); - log.info("发送成功,topic:【{}】",topic); } // public void sendCallbackOneMessage(String topic, Vehicle vehicle) { @@ -116,8 +117,10 @@ public class KafkaPCUtils { // } //监听消费 - @KafkaListener(topics = {"topic0","topic1","sb_topic"}) + @KafkaListener(topics = {"topic0","topic1"}) public void onNormalMessage1(ConsumerRecord record) { + String value = (String) record.value(); + JSON.parseObject(value, Vehicle.class); System.out.println("简单消费:" + record.topic() + "-" + record.partition() + "=" + record.value()); } diff --git a/src/main/java/com/mobai/service/impl/MqttCallBackServiceImpl.java b/src/main/java/com/mobai/service/impl/MqttCallBackServiceImpl.java index 9e45382..377f221 100644 --- a/src/main/java/com/mobai/service/impl/MqttCallBackServiceImpl.java +++ b/src/main/java/com/mobai/service/impl/MqttCallBackServiceImpl.java @@ -1,6 +1,7 @@ package com.mobai.service.impl; import com.mobai.domian.Vehicle; +//import com.mobai.kafka.KafkaPCUtils; import com.mobai.kafka.KafkaPCUtils; import lombok.extern.log4j.Log4j2; import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index f33be66..7354dc6 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -13,8 +13,6 @@ spring: producer: # Kafka服务器 bootstrap-servers: 127.0.0.1:9092 - bootstrap: - servers: 127.0.0.1:9092 # 开启事务,必须在开启了事务的方法中发送,否则报错 # transaction-id-prefix: kafkaTx- # 发生错误后,消息重发的次数,开启事务必须设置大于0。 @@ -29,9 +27,11 @@ spring: # 生产者内存缓冲区的大小。 buffer-memory: 1024000 # 键的序列化方式 - key-serializer: org.springframework.kafka.support.serializer.JsonSerializer - # 值的序列化方式(建议使用Json,这种序列化方式可以无需额外配置传输实体类) - value-serializer: org.springframework.kafka.support.serializer.JsonSerializer +# key-serializer: org.springframework.kafka.support.serializer.JsonSerializer +# # 值的序列化方式(建议使用Json,这种序列化方式可以无需额外配置传输实体类) +# value-serializer: org.springframework.kafka.support.serializer.JsonSerializer + key-serializer: org.apache.kafka.common.serialization.StringSerializer + value-serializer: org.apache.kafka.common.serialization.StringSerializer properties: linger: ms: 2000 # 延迟提交 @@ -52,15 +52,20 @@ spring: enable-auto-commit: false # 键的反序列化方式 #key-deserializer: org.apache.kafka.common.serialization.StringDeserializer - key-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer +# key-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer + key-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer # 值的反序列化方式(建议使用Json,这种序列化方式可以无需额外配置传输实体类) - value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer +# value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer + value-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer # 配置消费者的 Json 反序列化的可信赖包,反序列化实体类需要 properties: spring: json: trusted: packages: "*" + spring.deserializer.key.delegate.class: org.apache.kafka.common.serialization.StringDeserializer + spring.deserializer.value.delegate.class: org.apache.kafka.common.serialization.StringDeserializer + # 这个参数定义了poll方法最多可以拉取多少条消息,默认值为500。如果在拉取消息的时候新消息不足500条,那有多少返回多少;如果超过500条,每次只返回500。 # 这个默认值在有些场景下太大,有些场景很难保证能够在5min内处理完500条消息, # 如果消费者无法在5分钟内处理完500条消息的话就会触发reBalance,