feat: 解析报文后发送到kafka分区中并消费

master
yaoxin 2024-06-10 22:42:36 +08:00
parent 7d2f40d6f0
commit 3ef21f1a29
8 changed files with 177 additions and 38 deletions

View File

@ -1,9 +1,13 @@
package com.muyu.mqttmessage; package com.muyu.mqttmessage;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication; import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.ComponentScans;
import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
@SpringBootApplication @SpringBootApplication
public class MqttMessageApplication { public class MqttMessageApplication {
@ -12,5 +16,4 @@ public class MqttMessageApplication {
SpringApplication.run(MqttMessageApplication.class, args); SpringApplication.run(MqttMessageApplication.class, args);
} }
} }

View File

@ -0,0 +1,17 @@
package com.muyu.mqttmessage.common;
import lombok.Data;
/**
* @ClassName Test
* @Description
* @Author Xin.Yao
* @Date 2024/6/9 10:56
*/
@Data
public class Test {
private Integer partitions;
private String key;
private String data;
private String consumerName;
}

View File

@ -12,21 +12,21 @@ import java.util.Map;
* @Author Xin.Yao * @Author Xin.Yao
* @Date 2024/6/7 8:05 * @Date 2024/6/7 8:05
*/ */
//@Component @Component
//public class CustomizePartitioner implements Partitioner { public class CustomizePartitioner implements Partitioner {
// @Override @Override
// public int partition(String s, Object o, byte[] bytes, Object o1, byte[] bytes1, Cluster cluster) { public int partition(String s, Object o, byte[] bytes, Object o1, byte[] bytes1, Cluster cluster) {
// //自定义分区规则默认全部发送到0号分区 //自定义分区规则默认全部发送到0号分区
// return 0; return 0;
// } }
//
// @Override @Override
// public void close() { public void close() {
//
// } }
//
// @Override @Override
// public void configure(Map<String, ?> map) { public void configure(Map<String, ?> map) {
//
// } }
//} }

View File

@ -1,5 +1,6 @@
package com.muyu.mqttmessage.config.kafkaconfig; package com.muyu.mqttmessage.config.kafkaconfig;
import lombok.extern.log4j.Log4j2;
import org.springframework.kafka.support.SendResult; import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import org.springframework.util.concurrent.FailureCallback; import org.springframework.util.concurrent.FailureCallback;
@ -11,21 +12,24 @@ import org.springframework.util.concurrent.SuccessCallback;
* @Author Xin.Yao * @Author Xin.Yao
* @Date 2024/6/7 7:51 * @Date 2024/6/7 7:51
*/ */
//@Component @Component
//public class KafkaCallback implements SuccessCallback<SendResult<String, Object>> , FailureCallback { @Log4j2
// @Override public class KafkaCallback implements SuccessCallback<SendResult<String, Object>> , FailureCallback {
// public void onSuccess(SendResult<String, Object> success) { @Override
// // 消息发送到的topic public void onSuccess(SendResult<String, Object> success) {
// String topic = success.getRecordMetadata().topic(); // 消息发送到的topic
// // 消息发送到的分区 String topic = success.getRecordMetadata().topic();
// int partition = success.getRecordMetadata().partition(); // 消息发送到的分区
// // 消息在分区内的offset int partition = success.getRecordMetadata().partition();
// long offset = success.getRecordMetadata().offset(); // 消息在分区内的offset
// System.out.println("发送消息成功1:" + topic + "-" + partition + "-" + offset); long offset = success.getRecordMetadata().offset();
// } log.info("主题:{} 分区:{} offset:{}",topic,partition,offset);
// }
// @Override
// public void onFailure(Throwable ex) {
// System.out.println("发送消息失败1:" + ex.getMessage());
// } @Override
//} public void onFailure(Throwable ex) {
System.out.println("发送消息失败1:" + ex.getMessage());
}
}

View File

@ -8,6 +8,8 @@ import lombok.extern.log4j.Log4j2;
import org.eclipse.paho.client.mqttv3.MqttClient; import org.eclipse.paho.client.mqttv3.MqttClient;
import org.springframework.amqp.rabbit.annotation.Queue; import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
/** /**
@ -19,11 +21,15 @@ import org.springframework.stereotype.Component;
@Component @Component
@Log4j2 @Log4j2
public class RabbitConsumer { public class RabbitConsumer {
@Autowired
private KafkaTemplate<String,Object> kafkaTemplate;
@Autowired
private MqttFactory mqttFactory;
@RabbitListener(queuesToDeclare = {@Queue(RabbitMqConstant.MQTT_MESSAGE_QUEUE)}) @RabbitListener(queuesToDeclare = {@Queue(RabbitMqConstant.MQTT_MESSAGE_QUEUE)})
public void monitorServer(String msg){ public void monitorServer(String msg){
log.info("监听到的消息:{}",msg); log.info("监听到的消息:{}",msg);
MqttMessageModel mqttMessageModel = JSON.parseObject(msg, MqttMessageModel.class); MqttMessageModel mqttMessageModel = JSON.parseObject(msg, MqttMessageModel.class);
MqttClient mqttClient = MqttFactory.createMqttClient(mqttMessageModel); MqttClient mqttClient = mqttFactory.createMqttClient(mqttMessageModel);
log.info("{}服务器监听连接成功",mqttMessageModel.getTopic()); log.info("{}服务器监听连接成功",mqttMessageModel.getTopic());
} }

View File

@ -0,0 +1,35 @@
package com.muyu.mqttmessage.controller;
import com.muyu.mqttmessage.common.Test;
import com.muyu.mqttmessage.service.MqttKafkaService;
import com.muyu.mqttmessage.service.impl.MqttCallBackServiceImpl;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;
/**
* @ClassName TestController
* @Description
* @Author Xin.Yao
* @Date 2024/6/9 9:21
*/
@RestController
@RequestMapping("/test")
public class TestController {
@Autowired
private MqttKafkaService mqttKafkaService;
@GetMapping("/Test")
public void Test(@RequestBody Test test) {
mqttKafkaService.test(test);
}
@GetMapping("/Consumer")
public void consumer(@RequestBody Test test) {
mqttKafkaService.consumer(test);
}
@GetMapping("/CloseConsumer")
public void closeConsumer(@RequestParam("consumerName") String consumerName) {
mqttKafkaService.closeConsumer(consumerName);
}
}

View File

@ -0,0 +1,18 @@
package com.muyu.mqttmessage.service;
import com.muyu.mqttmessage.common.Test;
import org.springframework.stereotype.Component;
/**
* @ClassName MqttKafkaService
* @Description
* @Author Xin.Yao
* @Date 2024/6/9 11:05
*/
public interface MqttKafkaService {
void test(Test test);
void consumer(Test test);
void closeConsumer(String consumerName);
}

View File

@ -0,0 +1,56 @@
package com.muyu.mqttmessage.service.impl;
import com.muyu.mqttmessage.common.Test;
import com.muyu.mqttmessage.consumer.KafkaConsumers;
import com.muyu.mqttmessage.service.MqttKafkaService;
import lombok.extern.log4j.Log4j2;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
/**
* @ClassName MqttKafkaServiceImpl
* @Description
* @Author Xin.Yao
* @Date 2024/6/9 11:05
*/
@Service
@Log4j2
public class MqttKafkaServiceImpl implements MqttKafkaService {
private static Map<String, Boolean> consumerMap= new HashMap<>();
@Autowired
private KafkaTemplate<String, Object> kafkaTemplate;
@Autowired
private KafkaConsumers kafkaConsumers;
@Override
public void test(Test test) {
kafkaTemplate.send("testKafka",test.getPartitions(),test.getKey(),test.getData());
}
@Override
public void consumer(Test test) {
KafkaConsumer consumer = kafkaConsumers.kafkaConsumer(test);
consumerMap.put(test.getConsumerName(),true);
while (consumerMap.containsKey(test.getConsumerName())){
// 拉取消息
ConsumerRecords<String, String> msg = consumer.poll(Duration.ofSeconds(1));
for (ConsumerRecord<String, String> consumerRecord : msg) {
log.info("{}监听到的消息内容: {}",test.getConsumerName(),consumerRecord.value());
}
}
consumer.close();
}
@Override
public void closeConsumer(String consumerName) {
consumerMap.remove(consumerName);
}
}