feat: 解析报文后发送到kafka分区中并消费
parent
d635bca11e
commit
7d2f40d6f0
11
pom.xml
11
pom.xml
|
@ -21,6 +21,17 @@
|
||||||
<groupId>org.springframework.kafka</groupId>
|
<groupId>org.springframework.kafka</groupId>
|
||||||
<artifactId>spring-kafka</artifactId>
|
<artifactId>spring-kafka</artifactId>
|
||||||
</dependency>
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.apache.kafka</groupId>
|
||||||
|
<artifactId>kafka-clients</artifactId>
|
||||||
|
<version>3.3.0</version>
|
||||||
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.springframework.kafka</groupId>
|
||||||
|
<artifactId>spring-kafka-test</artifactId>
|
||||||
|
<scope>test</scope>
|
||||||
|
</dependency>
|
||||||
|
|
||||||
|
|
||||||
<dependency>
|
<dependency>
|
||||||
<groupId>org.eclipse.paho</groupId>
|
<groupId>org.eclipse.paho</groupId>
|
||||||
|
|
|
@ -7,6 +7,7 @@ import org.eclipse.paho.client.mqttv3.*;
|
||||||
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
|
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
|
||||||
import org.springframework.beans.factory.annotation.Autowired;
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
import org.springframework.kafka.core.KafkaTemplate;
|
import org.springframework.kafka.core.KafkaTemplate;
|
||||||
|
import org.springframework.stereotype.Component;
|
||||||
import org.springframework.stereotype.Service;
|
import org.springframework.stereotype.Service;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -18,62 +19,8 @@ import org.springframework.stereotype.Service;
|
||||||
@Service
|
@Service
|
||||||
@AllArgsConstructor
|
@AllArgsConstructor
|
||||||
public class MqttFactory {
|
public class MqttFactory {
|
||||||
// @Autowired
|
private MqttCallBackServiceImpl mqttCallBackService;
|
||||||
// private static KafkaTemplate<String,Object> kafkaTemplate;
|
public MqttClient createMqttClient(MqttMessageModel mqttMessageModel) {
|
||||||
//
|
|
||||||
// public static void main(String[] args) {
|
|
||||||
// kafkaTemplate.send("testKafka","测试");
|
|
||||||
// }
|
|
||||||
private static MqttCallBackServiceImpl mqttCallBackService;
|
|
||||||
// public static void main(String[] args){
|
|
||||||
// String broker = "tcp://43.142.44.217:1883";
|
|
||||||
// String topic = "mqtt001";
|
|
||||||
// String username = "emqx";
|
|
||||||
// String password = "public";
|
|
||||||
// String clientid = "subscribe_client";
|
|
||||||
// int qos = 0;
|
|
||||||
//
|
|
||||||
// try {
|
|
||||||
// MqttClient client = new MqttClient(broker, clientid, new MemoryPersistence());
|
|
||||||
// // 连接参数
|
|
||||||
// MqttConnectOptions options = new MqttConnectOptions();
|
|
||||||
// options.setUserName(username);
|
|
||||||
// options.setPassword(password.toCharArray());
|
|
||||||
// options.setConnectionTimeout(60);
|
|
||||||
// options.setKeepAliveInterval(60);
|
|
||||||
// // 设置回调
|
|
||||||
// client.setCallback(new MqttCallback() {
|
|
||||||
//
|
|
||||||
// public void connectionLost(Throwable cause) {
|
|
||||||
// System.out.println("connectionLost: " + cause.getMessage());
|
|
||||||
// }
|
|
||||||
//
|
|
||||||
// public void messageArrived(String topic, MqttMessage message) {
|
|
||||||
// System.out.println("topic: " + topic);
|
|
||||||
// System.out.println("Qos: " + message.getQos());
|
|
||||||
// System.out.println("message content: " + new String(message.getPayload()));
|
|
||||||
//
|
|
||||||
// }
|
|
||||||
//
|
|
||||||
// public void deliveryComplete(IMqttDeliveryToken token) {
|
|
||||||
// System.out.println("deliveryComplete---------" + token.isComplete());
|
|
||||||
// }
|
|
||||||
//
|
|
||||||
// });
|
|
||||||
// client.connect(options);
|
|
||||||
// client.subscribe(topic, qos);
|
|
||||||
// } catch (Exception e) {
|
|
||||||
// e.printStackTrace();
|
|
||||||
// }
|
|
||||||
// }
|
|
||||||
|
|
||||||
// public static void main(String[] args) {
|
|
||||||
// MqttMessageModel mqttMessageModel1 = MqttMessageModel.builderMqttMessage("tcp://43.142.44.217:1883", "mqtt001","1111","22222");
|
|
||||||
// MqttFactory.createMqttClient(mqttMessageModel1);
|
|
||||||
// MqttMessageModel mqttMessageModel2 = MqttMessageModel.builderMqttMessage("tcp://47.98.170.220:1883", "mqtt002","1111","22222");
|
|
||||||
// MqttFactory.createMqttClient(mqttMessageModel2);
|
|
||||||
// }
|
|
||||||
public static MqttClient createMqttClient(MqttMessageModel mqttMessageModel) {
|
|
||||||
MqttClient client =null;
|
MqttClient client =null;
|
||||||
int qos = 0;
|
int qos = 0;
|
||||||
try {
|
try {
|
||||||
|
@ -84,7 +31,7 @@ public class MqttFactory {
|
||||||
options.setPassword(mqttMessageModel.getPassword().toCharArray());
|
options.setPassword(mqttMessageModel.getPassword().toCharArray());
|
||||||
options.setConnectionTimeout(60);
|
options.setConnectionTimeout(60);
|
||||||
options.setKeepAliveInterval(60);
|
options.setKeepAliveInterval(60);
|
||||||
client.setCallback(new MqttCallBackServiceImpl());
|
client.setCallback(mqttCallBackService);
|
||||||
client.connect(options);
|
client.connect(options);
|
||||||
client.subscribe(mqttMessageModel.getTopic(), qos);
|
client.subscribe(mqttMessageModel.getTopic(), qos);
|
||||||
} catch (MqttException e) {
|
} catch (MqttException e) {
|
||||||
|
|
|
@ -1,5 +1,7 @@
|
||||||
package com.muyu.mqttmessage.config.kafkaconfig;
|
package com.muyu.mqttmessage.config.kafkaconfig;
|
||||||
|
|
||||||
|
import lombok.extern.log4j.Log4j;
|
||||||
|
import lombok.extern.log4j.Log4j2;
|
||||||
import org.apache.kafka.clients.producer.ProducerRecord;
|
import org.apache.kafka.clients.producer.ProducerRecord;
|
||||||
import org.apache.kafka.clients.producer.RecordMetadata;
|
import org.apache.kafka.clients.producer.RecordMetadata;
|
||||||
import org.springframework.beans.factory.annotation.Autowired;
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
|
@ -16,44 +18,46 @@ import org.springframework.kafka.support.ProducerListener;
|
||||||
* @Author Xin.Yao
|
* @Author Xin.Yao
|
||||||
* @Date 2024/6/7 下午7:55
|
* @Date 2024/6/7 下午7:55
|
||||||
*/
|
*/
|
||||||
//@Configuration
|
@Configuration
|
||||||
//public class KafkaConfig {
|
@Log4j2
|
||||||
//
|
public class KafkaConfig {
|
||||||
// @Autowired
|
|
||||||
// ProducerFactory producerFactory;
|
@Autowired
|
||||||
//
|
ProducerFactory producerFactory;
|
||||||
// @Bean
|
|
||||||
// public KafkaTemplate<String, Object> kafkaTemplate() {
|
@Bean
|
||||||
// KafkaTemplate<String, Object> kafkaTemplate = new KafkaTemplate<String, Object>(producerFactory);
|
public KafkaTemplate<String, Object> kafkaTemplate() {
|
||||||
// kafkaTemplate.setProducerListener(new ProducerListener<String, Object>() {
|
KafkaTemplate<String, Object> kafkaTemplate = new KafkaTemplate<String, Object>(producerFactory);
|
||||||
|
kafkaTemplate.setProducerListener(new ProducerListener<String, Object>() {
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onSuccess(ProducerRecord<String, Object> producerRecord, RecordMetadata recordMetadata) {
|
||||||
|
log.info("发送成功: {}" ,producerRecord.toString());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onError(ProducerRecord<String, Object> producerRecord, RecordMetadata recordMetadata, Exception exception) {
|
||||||
|
log.error("发送失败: {}",producerRecord.toString());
|
||||||
|
log.error(exception.getMessage());
|
||||||
|
}
|
||||||
|
|
||||||
// @Override
|
// @Override
|
||||||
// public void onSuccess(ProducerRecord<String, Object> producerRecord, RecordMetadata recordMetadata) {
|
// public void onSuccess(String topic, Integer partition, String key, Object value, RecordMetadata recordMetadata) {
|
||||||
// System.out.println("发送成功 " + producerRecord.toString());
|
// System.out.println("发送成功 topic = " + topic + " ; partion = " + partition + "; key = " + key + " ; value=" + value);
|
||||||
// }
|
// }
|
||||||
//
|
//
|
||||||
// @Override
|
// @Override
|
||||||
// public void onError(ProducerRecord<String, Object> producerRecord, RecordMetadata recordMetadata, Exception exception) {
|
// public void onError(ProducerRecord<String, Object> producerRecord, Exception exception) {
|
||||||
// System.out.println("发送失败" + producerRecord.toString());
|
// System.out.println("发送失败" + producerRecord.toString());
|
||||||
// System.out.println(exception.getMessage());
|
// System.out.println(exception.getMessage());
|
||||||
// }
|
// }
|
||||||
//
|
//
|
||||||
//// @Override
|
// @Override
|
||||||
//// public void onSuccess(String topic, Integer partition, String key, Object value, RecordMetadata recordMetadata) {
|
// public void onError(String topic, Integer partition, String key, Object value, Exception exception) {
|
||||||
//// System.out.println("发送成功 topic = " + topic + " ; partion = " + partition + "; key = " + key + " ; value=" + value);
|
// System.out.println("发送失败" + "topic = " + topic + " ; partion = " + partition + "; key = " + key + " ; value=" + value);
|
||||||
//// }
|
// System.out.println(exception.getMessage());
|
||||||
////
|
// }
|
||||||
//// @Override
|
});
|
||||||
//// public void onError(ProducerRecord<String, Object> producerRecord, Exception exception) {
|
return kafkaTemplate;
|
||||||
//// System.out.println("发送失败" + producerRecord.toString());
|
}
|
||||||
//// System.out.println(exception.getMessage());
|
}
|
||||||
//// }
|
|
||||||
////
|
|
||||||
//// @Override
|
|
||||||
//// public void onError(String topic, Integer partition, String key, Object value, Exception exception) {
|
|
||||||
//// System.out.println("发送失败" + "topic = " + topic + " ; partion = " + partition + "; key = " + key + " ; value=" + value);
|
|
||||||
//// System.out.println(exception.getMessage());
|
|
||||||
//// }
|
|
||||||
// });
|
|
||||||
// return kafkaTemplate;
|
|
||||||
// }
|
|
||||||
//}
|
|
||||||
|
|
|
@ -0,0 +1,49 @@
|
||||||
|
package com.muyu.mqttmessage.consumer;
|
||||||
|
|
||||||
|
import com.muyu.mqttmessage.common.Test;
|
||||||
|
import org.apache.kafka.clients.consumer.ConsumerConfig;
|
||||||
|
import org.apache.kafka.clients.consumer.KafkaConsumer;
|
||||||
|
import org.apache.kafka.common.TopicPartition;
|
||||||
|
import org.apache.kafka.common.serialization.StringDeserializer;
|
||||||
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
|
import org.springframework.beans.factory.annotation.Value;
|
||||||
|
import org.springframework.context.annotation.Configuration;
|
||||||
|
import org.springframework.kafka.core.ProducerFactory;
|
||||||
|
import org.springframework.stereotype.Component;
|
||||||
|
|
||||||
|
import java.time.Duration;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Properties;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @ClassName KafkaConsumers
|
||||||
|
* @Description 描述
|
||||||
|
* @Author Xin.Yao
|
||||||
|
* @Date 2024/6/9 上午9:54
|
||||||
|
*/
|
||||||
|
@Component
|
||||||
|
public class KafkaConsumers {
|
||||||
|
@Value("${spring.kafka.bootstrap-servers}")
|
||||||
|
private String bootstrapServers;
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
public KafkaConsumer kafkaConsumer(Test test){
|
||||||
|
Properties properties = new Properties();
|
||||||
|
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
|
||||||
|
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
|
||||||
|
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
|
||||||
|
// 指定分区策略
|
||||||
|
properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, "org.apache.kafka.clients.consumer.RoundRobinAssignor");
|
||||||
|
// 指定消费者组,必须参数
|
||||||
|
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test1");
|
||||||
|
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
|
||||||
|
// 订阅主题分区
|
||||||
|
List<TopicPartition> topicPartitions = new ArrayList<>();
|
||||||
|
topicPartitions.add(new TopicPartition("testKafka", test.getPartitions()));
|
||||||
|
consumer.assign(topicPartitions);
|
||||||
|
return consumer;
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
|
@ -1,6 +1,7 @@
|
||||||
package com.muyu.mqttmessage.service.impl;
|
package com.muyu.mqttmessage.service.impl;
|
||||||
|
|
||||||
import com.alibaba.fastjson2.JSON;
|
import com.alibaba.fastjson2.JSON;
|
||||||
|
import com.muyu.mqttmessage.common.Test;
|
||||||
import com.muyu.mqttmessage.domain.VehicleData;
|
import com.muyu.mqttmessage.domain.VehicleData;
|
||||||
import com.muyu.mqttmessage.utils.ConversionUtil;
|
import com.muyu.mqttmessage.utils.ConversionUtil;
|
||||||
import lombok.extern.log4j.Log4j2;
|
import lombok.extern.log4j.Log4j2;
|
||||||
|
@ -10,6 +11,7 @@ import org.eclipse.paho.client.mqttv3.MqttMessage;
|
||||||
import org.springframework.beans.factory.annotation.Autowired;
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
import org.springframework.kafka.core.KafkaTemplate;
|
import org.springframework.kafka.core.KafkaTemplate;
|
||||||
import org.springframework.stereotype.Component;
|
import org.springframework.stereotype.Component;
|
||||||
|
import org.springframework.stereotype.Service;
|
||||||
|
|
||||||
import java.math.BigDecimal;
|
import java.math.BigDecimal;
|
||||||
|
|
||||||
|
@ -22,7 +24,14 @@ import java.math.BigDecimal;
|
||||||
@Component
|
@Component
|
||||||
@Log4j2
|
@Log4j2
|
||||||
public class MqttCallBackServiceImpl implements MqttCallback {
|
public class MqttCallBackServiceImpl implements MqttCallback {
|
||||||
// @Autowired
|
private KafkaTemplate<String,Object> kafkaTemplate;
|
||||||
|
|
||||||
|
public MqttCallBackServiceImpl(KafkaTemplate<String, Object> kafkaTemplate) {
|
||||||
|
this.kafkaTemplate = kafkaTemplate;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
// @Autowired
|
||||||
// private KafkaTemplate<String,Object> kafkaTemplate;
|
// private KafkaTemplate<String,Object> kafkaTemplate;
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -32,9 +41,19 @@ public class MqttCallBackServiceImpl implements MqttCallback {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void messageArrived(String topic, MqttMessage message) {
|
public void messageArrived(String topic, MqttMessage message) {
|
||||||
log.info("服务器{}监听的报文: {}" ,topic, ConversionUtil.hexStringToString(new String(message.getPayload())));
|
try {
|
||||||
log.info("转化对象:{}", JSON.toJSONString(getVehicleData(ConversionUtil.hexStringToString(new String(message.getPayload())))));
|
VehicleData vehicleData = getVehicleData(ConversionUtil.hexStringToString(new String(message.getPayload())));
|
||||||
// kafkaTemplate.send("testKafka",getVehicleData(ConversionUtil.hexStringToString(new String(message.getPayload()))).toString());
|
String jsonString = JSON.toJSONString(vehicleData);
|
||||||
|
log.info("转化为对象:{}",jsonString);
|
||||||
|
Test test = new Test();
|
||||||
|
test.setPartitions(1);
|
||||||
|
test.setKey("123");
|
||||||
|
test.setData(jsonString);
|
||||||
|
kafkaTemplate.send("testKafka",test.getPartitions(),test.getKey(),test.getData());
|
||||||
|
}catch (Exception e){
|
||||||
|
e.printStackTrace();
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -15,44 +15,57 @@ spring:
|
||||||
prefetch: 1 # 每次只能获取一条,处理完成才能获取下一条
|
prefetch: 1 # 每次只能获取一条,处理完成才能获取下一条
|
||||||
publisher-confirm-type: correlated #确认消息已发送到交换机(Exchange)
|
publisher-confirm-type: correlated #确认消息已发送到交换机(Exchange)
|
||||||
publisher-returns: true #确认消息已发送到队列(Queue)
|
publisher-returns: true #确认消息已发送到队列(Queue)
|
||||||
|
# kafka:
|
||||||
|
# bootstrap-servers: 47.98.170.220:9092 #这个是kafka的地址,对应你server.properties中配置的
|
||||||
|
# producer:
|
||||||
|
# batch-size: 16384 #批量大小
|
||||||
|
# acks: -1 #应答级别:多少个分区副本备份完成时向生产者发送ack确认(可选0、1、all/-1)
|
||||||
|
# retries: 10 # 消息发送重试次数
|
||||||
|
# #transaction-id-prefix: transaction
|
||||||
|
# buffer-memory: 33554432
|
||||||
|
# key-serializer: org.apache.kafka.common.serialization.StringSerializer
|
||||||
|
# value-serializer: org.apache.kafka.common.serialization.StringSerializer
|
||||||
|
# properties:
|
||||||
|
# partitioner:
|
||||||
|
# class: com.muyu.mqttmessage.config.kafkaconfig.CustomizePartitioner
|
||||||
|
# linger:
|
||||||
|
# ms: 2000 #提交延迟
|
||||||
|
# #partitioner: #指定分区器
|
||||||
|
# #class: pers.zhang.config.CustomerPartitionHandler
|
||||||
|
# consumer:
|
||||||
|
# group-id: testGroup #默认的消费组ID
|
||||||
|
# enable-auto-commit: true #是否自动提交offset
|
||||||
|
# auto-commit-interval: 2000 #提交offset延时
|
||||||
|
# # 当kafka中没有初始offset或offset超出范围时将自动重置offset
|
||||||
|
# # earliest:重置为分区中最小的offset;
|
||||||
|
# # latest:重置为分区中最新的offset(消费分区中新产生的数据);
|
||||||
|
# # none:只要有一个分区不存在已提交的offset,就抛出异常;
|
||||||
|
# auto-offset-reset: latest
|
||||||
|
# max-poll-records: 500 #单次拉取消息的最大条数
|
||||||
|
# key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
|
||||||
|
# value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
|
||||||
|
# properties:
|
||||||
|
# session:
|
||||||
|
# timeout:
|
||||||
|
# ms: 120000 # 消费会话超时时间(超过这个时间 consumer 没有发送心跳,就会触发 rebalance 操作)
|
||||||
|
# request:
|
||||||
|
# timeout:
|
||||||
|
# ms: 18000 # 消费请求的超时时间
|
||||||
|
# listener:
|
||||||
|
# missing-topics-fatal: false # consumer listener topics 不存在时,启动项目就会报错
|
||||||
|
# # type: batch
|
||||||
|
|
||||||
kafka:
|
kafka:
|
||||||
bootstrap-servers: 47.98.170.220:9092 #这个是kafka的地址,对应你server.properties中配置的
|
#config/consumer.properties配置的bootstrap.servers
|
||||||
|
bootstrap-servers: 47.98.170.220:9092
|
||||||
producer:
|
producer:
|
||||||
batch-size: 16384 #批量大小
|
|
||||||
acks: -1 #应答级别:多少个分区副本备份完成时向生产者发送ack确认(可选0、1、all/-1)
|
|
||||||
retries: 10 # 消息发送重试次数
|
|
||||||
#transaction-id-prefix: transaction
|
|
||||||
buffer-memory: 33554432
|
|
||||||
key-serializer: org.apache.kafka.common.serialization.StringSerializer
|
key-serializer: org.apache.kafka.common.serialization.StringSerializer
|
||||||
value-serializer: org.apache.kafka.common.serialization.StringSerializer
|
value-serializer: org.apache.kafka.common.serialization.StringSerializer
|
||||||
properties:
|
|
||||||
partitioner:
|
|
||||||
class: com.muyu.mqttmessage.config.kafkaconfig.CustomizePartitioner
|
|
||||||
linger:
|
|
||||||
ms: 2000 #提交延迟
|
|
||||||
#partitioner: #指定分区器
|
|
||||||
#class: pers.zhang.config.CustomerPartitionHandler
|
|
||||||
consumer:
|
consumer:
|
||||||
group-id: testGroup #默认的消费组ID
|
|
||||||
enable-auto-commit: true #是否自动提交offset
|
|
||||||
auto-commit-interval: 2000 #提交offset延时
|
|
||||||
# 当kafka中没有初始offset或offset超出范围时将自动重置offset
|
|
||||||
# earliest:重置为分区中最小的offset;
|
|
||||||
# latest:重置为分区中最新的offset(消费分区中新产生的数据);
|
|
||||||
# none:只要有一个分区不存在已提交的offset,就抛出异常;
|
|
||||||
auto-offset-reset: latest
|
|
||||||
max-poll-records: 500 #单次拉取消息的最大条数
|
|
||||||
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
|
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
|
||||||
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
|
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
|
||||||
properties:
|
#这个可以和config/consumer.properties里的group.id不同
|
||||||
session:
|
group-id: test-consumer-group
|
||||||
timeout:
|
|
||||||
ms: 120000 # 消费会话超时时间(超过这个时间 consumer 没有发送心跳,就会触发 rebalance 操作)
|
|
||||||
request:
|
|
||||||
timeout:
|
|
||||||
ms: 18000 # 消费请求的超时时间
|
|
||||||
listener:
|
|
||||||
missing-topics-fatal: false # consumer listener topics 不存在时,启动项目就会报错
|
|
||||||
# type: batch
|
|
||||||
server:
|
server:
|
||||||
port: 9005
|
port: 9005
|
||||||
|
|
Loading…
Reference in New Issue