From ece9b8a42e04f1e2831cb3f0c7899e4eb45b0f27 Mon Sep 17 00:00:00 2001
From: yaoxin <1752800946@qq.com>
Date: Fri, 7 Jun 2024 22:32:18 +0800
Subject: [PATCH] =?UTF-8?q?feat:=20=E6=96=B0=E5=A2=9E=E4=BD=BF=E7=94=A8kaf?=
=?UTF-8?q?ka=E5=8F=91=E9=80=81=E6=B6=88=E6=81=AF?=
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
---
pom.xml | 15 ++---
.../mqttmessage/MqttMessageApplication.java | 3 +
.../muyu/mqttmessage/config/MqttFactory.java | 14 ++---
.../kafkaconfig/CustomizePartitioner.java | 32 ++++++++++
.../config/kafkaconfig/KafkaCallback.java | 31 ++++++++++
.../config/kafkaconfig/KafkaConfig.java | 59 +++++++++++++++++++
.../constants/RabbitMqConstant.java | 14 +++++
.../mqttmessage/consumer/RabbitConsumer.java | 28 +++++++++
.../muyu/mqttmessage/domain/VehicleData.java | 14 -----
.../service/impl/MqttCallBackServiceImpl.java | 6 ++
src/main/resources/application.yml | 40 ++++++++++++-
11 files changed, 225 insertions(+), 31 deletions(-)
create mode 100644 src/main/java/com/muyu/mqttmessage/config/kafkaconfig/CustomizePartitioner.java
create mode 100644 src/main/java/com/muyu/mqttmessage/config/kafkaconfig/KafkaCallback.java
create mode 100644 src/main/java/com/muyu/mqttmessage/config/kafkaconfig/KafkaConfig.java
create mode 100644 src/main/java/com/muyu/mqttmessage/constants/RabbitMqConstant.java
create mode 100644 src/main/java/com/muyu/mqttmessage/consumer/RabbitConsumer.java
diff --git a/pom.xml b/pom.xml
index a26f366..86bd087 100644
--- a/pom.xml
+++ b/pom.xml
@@ -17,6 +17,11 @@
17
+
+ org.springframework.kafka
+ spring-kafka
+
+
org.eclipse.paho
org.eclipse.paho.client.mqttv3
@@ -44,10 +49,7 @@
org.springframework.boot
spring-boot-starter-amqp
-
- org.springframework.kafka
- spring-kafka
-
+
org.projectlombok
@@ -64,11 +66,6 @@
spring-rabbit-test
test
-
- org.springframework.kafka
- spring-kafka-test
- test
-
diff --git a/src/main/java/com/muyu/mqttmessage/MqttMessageApplication.java b/src/main/java/com/muyu/mqttmessage/MqttMessageApplication.java
index 14851ce..65c9bd0 100644
--- a/src/main/java/com/muyu/mqttmessage/MqttMessageApplication.java
+++ b/src/main/java/com/muyu/mqttmessage/MqttMessageApplication.java
@@ -2,6 +2,8 @@ package com.muyu.mqttmessage;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
+import org.springframework.context.annotation.Bean;
+import org.springframework.kafka.core.KafkaTemplate;
@SpringBootApplication
public class MqttMessageApplication {
@@ -10,4 +12,5 @@ public class MqttMessageApplication {
SpringApplication.run(MqttMessageApplication.class, args);
}
+
}
diff --git a/src/main/java/com/muyu/mqttmessage/config/MqttFactory.java b/src/main/java/com/muyu/mqttmessage/config/MqttFactory.java
index a29ca23..38ed4e0 100644
--- a/src/main/java/com/muyu/mqttmessage/config/MqttFactory.java
+++ b/src/main/java/com/muyu/mqttmessage/config/MqttFactory.java
@@ -60,17 +60,17 @@ public class MqttFactory {
// }
// }
- public static void main(String[] args) {
- MqttMessageModel mqttMessageModel1 = MqttMessageModel.builderMqttMessage("tcp://43.142.44.217:1883", "mqtt001","1111","22222");
- MqttFactory.createMqttClient(mqttMessageModel1);
- MqttMessageModel mqttMessageModel2 = MqttMessageModel.builderMqttMessage("tcp://47.98.170.220:1883", "mqtt002","1111","22222");
- MqttFactory.createMqttClient(mqttMessageModel2);
- }
+// public static void main(String[] args) {
+// MqttMessageModel mqttMessageModel1 = MqttMessageModel.builderMqttMessage("tcp://43.142.44.217:1883", "mqtt001","1111","22222");
+// MqttFactory.createMqttClient(mqttMessageModel1);
+// MqttMessageModel mqttMessageModel2 = MqttMessageModel.builderMqttMessage("tcp://47.98.170.220:1883", "mqtt002","1111","22222");
+// MqttFactory.createMqttClient(mqttMessageModel2);
+// }
public static MqttClient createMqttClient(MqttMessageModel mqttMessageModel) {
MqttClient client =null;
int qos = 0;
try {
- client = new MqttClient(mqttMessageModel.getBroker(), mqttMessageModel.getClientId(), new MemoryPersistence());
+ client = new MqttClient("tcp://"+mqttMessageModel.getBroker()+":1883", mqttMessageModel.getClientId(), new MemoryPersistence());
// 连接参数
MqttConnectOptions options = new MqttConnectOptions();
options.setUserName(mqttMessageModel.getUsername());
diff --git a/src/main/java/com/muyu/mqttmessage/config/kafkaconfig/CustomizePartitioner.java b/src/main/java/com/muyu/mqttmessage/config/kafkaconfig/CustomizePartitioner.java
new file mode 100644
index 0000000..ca027c8
--- /dev/null
+++ b/src/main/java/com/muyu/mqttmessage/config/kafkaconfig/CustomizePartitioner.java
@@ -0,0 +1,32 @@
+package com.muyu.mqttmessage.config.kafkaconfig;
+
+import org.apache.kafka.clients.producer.Partitioner;
+import org.apache.kafka.common.Cluster;
+import org.springframework.stereotype.Component;
+
+import java.util.Map;
+
+/**
+ * @ClassName CustomizePartitioner
+ * @Description kafka自定义分区
+ * @Author Xin.Yao
+ * @Date 2024/6/7 下午8:05
+ */
+@Component
+public class CustomizePartitioner implements Partitioner {
+ @Override
+ public int partition(String s, Object o, byte[] bytes, Object o1, byte[] bytes1, Cluster cluster) {
+ //自定义分区规则,默认全部发送到0号分区
+ return 0;
+ }
+
+ @Override
+ public void close() {
+
+ }
+
+ @Override
+ public void configure(Map map) {
+
+ }
+}
diff --git a/src/main/java/com/muyu/mqttmessage/config/kafkaconfig/KafkaCallback.java b/src/main/java/com/muyu/mqttmessage/config/kafkaconfig/KafkaCallback.java
new file mode 100644
index 0000000..0624064
--- /dev/null
+++ b/src/main/java/com/muyu/mqttmessage/config/kafkaconfig/KafkaCallback.java
@@ -0,0 +1,31 @@
+package com.muyu.mqttmessage.config.kafkaconfig;
+
+import org.springframework.kafka.support.SendResult;
+import org.springframework.stereotype.Component;
+import org.springframework.util.concurrent.FailureCallback;
+import org.springframework.util.concurrent.SuccessCallback;
+
+/**
+ * @ClassName SuccessCallback
+ * @Description kafka消息发送成功回调
+ * @Author Xin.Yao
+ * @Date 2024/6/7 下午7:51
+ */
+@Component
+public class KafkaCallback implements SuccessCallback> , FailureCallback {
+ @Override
+ public void onSuccess(SendResult success) {
+ // 消息发送到的topic
+ String topic = success.getRecordMetadata().topic();
+ // 消息发送到的分区
+ int partition = success.getRecordMetadata().partition();
+ // 消息在分区内的offset
+ long offset = success.getRecordMetadata().offset();
+ System.out.println("发送消息成功1:" + topic + "-" + partition + "-" + offset);
+ }
+
+ @Override
+ public void onFailure(Throwable ex) {
+ System.out.println("发送消息失败1:" + ex.getMessage());
+ }
+}
diff --git a/src/main/java/com/muyu/mqttmessage/config/kafkaconfig/KafkaConfig.java b/src/main/java/com/muyu/mqttmessage/config/kafkaconfig/KafkaConfig.java
new file mode 100644
index 0000000..34d86f3
--- /dev/null
+++ b/src/main/java/com/muyu/mqttmessage/config/kafkaconfig/KafkaConfig.java
@@ -0,0 +1,59 @@
+package com.muyu.mqttmessage.config.kafkaconfig;
+
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.kafka.core.KafkaTemplate;
+import org.springframework.kafka.core.ProducerFactory;
+import org.springframework.kafka.support.ProducerListener;
+
+
+/**
+ * @ClassName KafkaConfig
+ * @Description kafka消息监听器
+ * @Author Xin.Yao
+ * @Date 2024/6/7 下午7:55
+ */
+@Configuration
+public class KafkaConfig {
+
+ @Autowired
+ ProducerFactory producerFactory;
+
+ @Bean
+ public KafkaTemplate kafkaTemplate() {
+ KafkaTemplate kafkaTemplate = new KafkaTemplate(producerFactory);
+ kafkaTemplate.setProducerListener(new ProducerListener() {
+ @Override
+ public void onSuccess(ProducerRecord producerRecord, RecordMetadata recordMetadata) {
+ System.out.println("发送成功 " + producerRecord.toString());
+ }
+
+ @Override
+ public void onError(ProducerRecord producerRecord, RecordMetadata recordMetadata, Exception exception) {
+ System.out.println("发送失败" + producerRecord.toString());
+ System.out.println(exception.getMessage());
+ }
+
+// @Override
+// public void onSuccess(String topic, Integer partition, String key, Object value, RecordMetadata recordMetadata) {
+// System.out.println("发送成功 topic = " + topic + " ; partion = " + partition + "; key = " + key + " ; value=" + value);
+// }
+//
+// @Override
+// public void onError(ProducerRecord producerRecord, Exception exception) {
+// System.out.println("发送失败" + producerRecord.toString());
+// System.out.println(exception.getMessage());
+// }
+//
+// @Override
+// public void onError(String topic, Integer partition, String key, Object value, Exception exception) {
+// System.out.println("发送失败" + "topic = " + topic + " ; partion = " + partition + "; key = " + key + " ; value=" + value);
+// System.out.println(exception.getMessage());
+// }
+ });
+ return kafkaTemplate;
+ }
+}
diff --git a/src/main/java/com/muyu/mqttmessage/constants/RabbitMqConstant.java b/src/main/java/com/muyu/mqttmessage/constants/RabbitMqConstant.java
new file mode 100644
index 0000000..129e474
--- /dev/null
+++ b/src/main/java/com/muyu/mqttmessage/constants/RabbitMqConstant.java
@@ -0,0 +1,14 @@
+package com.muyu.mqttmessage.constants;
+
+import org.springframework.stereotype.Component;
+
+/**
+ * @ClassName RabbitMqConstant
+ * @Description rabbitmq常量
+ * @Author Xin.Yao
+ * @Date 2024/6/2 上午9:36
+ */
+@Component
+public class RabbitMqConstant {
+ public static final String MQTT_MESSAGE_QUEUE = "mqttmessage";
+}
diff --git a/src/main/java/com/muyu/mqttmessage/consumer/RabbitConsumer.java b/src/main/java/com/muyu/mqttmessage/consumer/RabbitConsumer.java
new file mode 100644
index 0000000..9b2da1c
--- /dev/null
+++ b/src/main/java/com/muyu/mqttmessage/consumer/RabbitConsumer.java
@@ -0,0 +1,28 @@
+package com.muyu.mqttmessage.consumer;
+
+import com.alibaba.fastjson2.JSON;
+import com.muyu.mqttmessage.common.MqttMessageModel;
+import com.muyu.mqttmessage.config.MqttFactory;
+import com.muyu.mqttmessage.constants.RabbitMqConstant;
+import lombok.extern.log4j.Log4j2;
+import org.springframework.amqp.rabbit.annotation.Queue;
+import org.springframework.amqp.rabbit.annotation.RabbitListener;
+import org.springframework.stereotype.Component;
+
+/**
+ * @ClassName RabbitConsumer
+ * @Description 描述
+ * @Author Xin.Yao
+ * @Date 2024/6/6 上午9:35
+ */
+@Component
+@Log4j2
+public class RabbitConsumer {
+ @RabbitListener(queuesToDeclare = {@Queue(RabbitMqConstant.MQTT_MESSAGE_QUEUE)})
+ public void monitorServer(String msg){
+ log.info("监听到的消息:{}",msg);
+ MqttMessageModel mqttMessageModel = JSON.parseObject(msg, MqttMessageModel.class);
+ MqttFactory.createMqttClient(mqttMessageModel);
+ log.info("{}服务器监听连接成功",mqttMessageModel.getTopic());
+ }
+}
diff --git a/src/main/java/com/muyu/mqttmessage/domain/VehicleData.java b/src/main/java/com/muyu/mqttmessage/domain/VehicleData.java
index 5e4a7bd..284e52d 100644
--- a/src/main/java/com/muyu/mqttmessage/domain/VehicleData.java
+++ b/src/main/java/com/muyu/mqttmessage/domain/VehicleData.java
@@ -23,72 +23,58 @@ public class VehicleData {
* VIN
*/
private String vin;
-
/**
* 行驶路线
*/
private String drivingRoute;
-
/**
* 经度
*/
private String longitude;
-
/**
* 维度
*/
private String latitude;
-
/**
* 速度
*/
private String speed;
-
/**
* 里程
*/
private BigDecimal mileage;
-
/**
* 总电压
*/
private String voltage;
-
/**
* 总电流
*/
private String current;
-
/**
* 绝缘电阻
*/
private String resistance;
-
/**
* 档位
*/
private String gear = "P";
-
/**
* 加速踏板行程值
*/
private String accelerationPedal;
-
/**
* 制动踏板行程值
*/
private String brakePedal;
-
/**
* 燃料消耗率
*/
private String fuelConsumptionRate;
-
/**
* 电机控制器温度
*/
private String motorControllerTemperature;
-
/**
* 电机转速
*/
diff --git a/src/main/java/com/muyu/mqttmessage/service/impl/MqttCallBackServiceImpl.java b/src/main/java/com/muyu/mqttmessage/service/impl/MqttCallBackServiceImpl.java
index 9ed391c..efa01c8 100644
--- a/src/main/java/com/muyu/mqttmessage/service/impl/MqttCallBackServiceImpl.java
+++ b/src/main/java/com/muyu/mqttmessage/service/impl/MqttCallBackServiceImpl.java
@@ -7,6 +7,8 @@ import lombok.extern.log4j.Log4j2;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttMessage;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;
import java.math.BigDecimal;
@@ -20,6 +22,9 @@ import java.math.BigDecimal;
@Component
@Log4j2
public class MqttCallBackServiceImpl implements MqttCallback {
+ @Autowired
+ private KafkaTemplate kafkaTemplate;
+
@Override
public void connectionLost(Throwable cause) {
System.out.println("connectionLost: " + cause.getMessage());
@@ -29,6 +34,7 @@ public class MqttCallBackServiceImpl implements MqttCallback {
public void messageArrived(String topic, MqttMessage message) {
log.info("服务器{}监听的报文: {}" ,topic, ConversionUtil.hexStringToString(new String(message.getPayload())));
log.info("转化对象:{}", JSON.toJSONString(getVehicleData(ConversionUtil.hexStringToString(new String(message.getPayload())))));
+ kafkaTemplate.send("testKafka",getVehicleData(ConversionUtil.hexStringToString(new String(message.getPayload()))).toString());
}
@Override
diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml
index b456e74..d9888ba 100644
--- a/src/main/resources/application.yml
+++ b/src/main/resources/application.yml
@@ -15,6 +15,44 @@ spring:
prefetch: 1 # 每次只能获取一条,处理完成才能获取下一条
publisher-confirm-type: correlated #确认消息已发送到交换机(Exchange)
publisher-returns: true #确认消息已发送到队列(Queue)
-
+ kafka:
+ bootstrap-servers: 47.98.170.220:9092 #这个是kafka的地址,对应你server.properties中配置的
+ producer:
+ batch-size: 16384 #批量大小
+ acks: -1 #应答级别:多少个分区副本备份完成时向生产者发送ack确认(可选0、1、all/-1)
+ retries: 10 # 消息发送重试次数
+ #transaction-id-prefix: transaction
+ buffer-memory: 33554432
+ key-serializer: org.apache.kafka.common.serialization.StringSerializer
+ value-serializer: org.apache.kafka.common.serialization.StringSerializer
+ properties:
+ partitioner:
+ class: com.muyu.mqttmessage.config.kafkaconfig.CustomizePartitioner
+ linger:
+ ms: 2000 #提交延迟
+ #partitioner: #指定分区器
+ #class: pers.zhang.config.CustomerPartitionHandler
+ consumer:
+ group-id: testGroup #默认的消费组ID
+ enable-auto-commit: true #是否自动提交offset
+ auto-commit-interval: 2000 #提交offset延时
+ # 当kafka中没有初始offset或offset超出范围时将自动重置offset
+ # earliest:重置为分区中最小的offset;
+ # latest:重置为分区中最新的offset(消费分区中新产生的数据);
+ # none:只要有一个分区不存在已提交的offset,就抛出异常;
+ auto-offset-reset: latest
+ max-poll-records: 500 #单次拉取消息的最大条数
+ key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
+ value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
+ properties:
+ session:
+ timeout:
+ ms: 120000 # 消费会话超时时间(超过这个时间 consumer 没有发送心跳,就会触发 rebalance 操作)
+ request:
+ timeout:
+ ms: 18000 # 消费请求的超时时间
+ listener:
+ missing-topics-fatal: false # consumer listener topics 不存在时,启动项目就会报错
+ # type: batch
server:
port: 9005