feat: 新增使用kafka发送消息
parent
8382839122
commit
ece9b8a42e
15
pom.xml
15
pom.xml
|
@ -17,6 +17,11 @@
|
||||||
<java.version>17</java.version>
|
<java.version>17</java.version>
|
||||||
</properties>
|
</properties>
|
||||||
<dependencies>
|
<dependencies>
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.springframework.kafka</groupId>
|
||||||
|
<artifactId>spring-kafka</artifactId>
|
||||||
|
</dependency>
|
||||||
|
|
||||||
<dependency>
|
<dependency>
|
||||||
<groupId>org.eclipse.paho</groupId>
|
<groupId>org.eclipse.paho</groupId>
|
||||||
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
|
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
|
||||||
|
@ -44,10 +49,7 @@
|
||||||
<groupId>org.springframework.boot</groupId>
|
<groupId>org.springframework.boot</groupId>
|
||||||
<artifactId>spring-boot-starter-amqp</artifactId>
|
<artifactId>spring-boot-starter-amqp</artifactId>
|
||||||
</dependency>
|
</dependency>
|
||||||
<dependency>
|
|
||||||
<groupId>org.springframework.kafka</groupId>
|
|
||||||
<artifactId>spring-kafka</artifactId>
|
|
||||||
</dependency>
|
|
||||||
|
|
||||||
<dependency>
|
<dependency>
|
||||||
<groupId>org.projectlombok</groupId>
|
<groupId>org.projectlombok</groupId>
|
||||||
|
@ -64,11 +66,6 @@
|
||||||
<artifactId>spring-rabbit-test</artifactId>
|
<artifactId>spring-rabbit-test</artifactId>
|
||||||
<scope>test</scope>
|
<scope>test</scope>
|
||||||
</dependency>
|
</dependency>
|
||||||
<dependency>
|
|
||||||
<groupId>org.springframework.kafka</groupId>
|
|
||||||
<artifactId>spring-kafka-test</artifactId>
|
|
||||||
<scope>test</scope>
|
|
||||||
</dependency>
|
|
||||||
</dependencies>
|
</dependencies>
|
||||||
|
|
||||||
<build>
|
<build>
|
||||||
|
|
|
@ -2,6 +2,8 @@ package com.muyu.mqttmessage;
|
||||||
|
|
||||||
import org.springframework.boot.SpringApplication;
|
import org.springframework.boot.SpringApplication;
|
||||||
import org.springframework.boot.autoconfigure.SpringBootApplication;
|
import org.springframework.boot.autoconfigure.SpringBootApplication;
|
||||||
|
import org.springframework.context.annotation.Bean;
|
||||||
|
import org.springframework.kafka.core.KafkaTemplate;
|
||||||
|
|
||||||
@SpringBootApplication
|
@SpringBootApplication
|
||||||
public class MqttMessageApplication {
|
public class MqttMessageApplication {
|
||||||
|
@ -10,4 +12,5 @@ public class MqttMessageApplication {
|
||||||
SpringApplication.run(MqttMessageApplication.class, args);
|
SpringApplication.run(MqttMessageApplication.class, args);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -60,17 +60,17 @@ public class MqttFactory {
|
||||||
// }
|
// }
|
||||||
// }
|
// }
|
||||||
|
|
||||||
public static void main(String[] args) {
|
// public static void main(String[] args) {
|
||||||
MqttMessageModel mqttMessageModel1 = MqttMessageModel.builderMqttMessage("tcp://43.142.44.217:1883", "mqtt001","1111","22222");
|
// MqttMessageModel mqttMessageModel1 = MqttMessageModel.builderMqttMessage("tcp://43.142.44.217:1883", "mqtt001","1111","22222");
|
||||||
MqttFactory.createMqttClient(mqttMessageModel1);
|
// MqttFactory.createMqttClient(mqttMessageModel1);
|
||||||
MqttMessageModel mqttMessageModel2 = MqttMessageModel.builderMqttMessage("tcp://47.98.170.220:1883", "mqtt002","1111","22222");
|
// MqttMessageModel mqttMessageModel2 = MqttMessageModel.builderMqttMessage("tcp://47.98.170.220:1883", "mqtt002","1111","22222");
|
||||||
MqttFactory.createMqttClient(mqttMessageModel2);
|
// MqttFactory.createMqttClient(mqttMessageModel2);
|
||||||
}
|
// }
|
||||||
public static MqttClient createMqttClient(MqttMessageModel mqttMessageModel) {
|
public static MqttClient createMqttClient(MqttMessageModel mqttMessageModel) {
|
||||||
MqttClient client =null;
|
MqttClient client =null;
|
||||||
int qos = 0;
|
int qos = 0;
|
||||||
try {
|
try {
|
||||||
client = new MqttClient(mqttMessageModel.getBroker(), mqttMessageModel.getClientId(), new MemoryPersistence());
|
client = new MqttClient("tcp://"+mqttMessageModel.getBroker()+":1883", mqttMessageModel.getClientId(), new MemoryPersistence());
|
||||||
// 连接参数
|
// 连接参数
|
||||||
MqttConnectOptions options = new MqttConnectOptions();
|
MqttConnectOptions options = new MqttConnectOptions();
|
||||||
options.setUserName(mqttMessageModel.getUsername());
|
options.setUserName(mqttMessageModel.getUsername());
|
||||||
|
|
|
@ -0,0 +1,32 @@
|
||||||
|
package com.muyu.mqttmessage.config.kafkaconfig;
|
||||||
|
|
||||||
|
import org.apache.kafka.clients.producer.Partitioner;
|
||||||
|
import org.apache.kafka.common.Cluster;
|
||||||
|
import org.springframework.stereotype.Component;
|
||||||
|
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @ClassName CustomizePartitioner
|
||||||
|
* @Description kafka自定义分区
|
||||||
|
* @Author Xin.Yao
|
||||||
|
* @Date 2024/6/7 下午8:05
|
||||||
|
*/
|
||||||
|
@Component
|
||||||
|
public class CustomizePartitioner implements Partitioner {
|
||||||
|
@Override
|
||||||
|
public int partition(String s, Object o, byte[] bytes, Object o1, byte[] bytes1, Cluster cluster) {
|
||||||
|
//自定义分区规则,默认全部发送到0号分区
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void close() {
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void configure(Map<String, ?> map) {
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,31 @@
|
||||||
|
package com.muyu.mqttmessage.config.kafkaconfig;
|
||||||
|
|
||||||
|
import org.springframework.kafka.support.SendResult;
|
||||||
|
import org.springframework.stereotype.Component;
|
||||||
|
import org.springframework.util.concurrent.FailureCallback;
|
||||||
|
import org.springframework.util.concurrent.SuccessCallback;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @ClassName SuccessCallback
|
||||||
|
* @Description kafka消息发送成功回调
|
||||||
|
* @Author Xin.Yao
|
||||||
|
* @Date 2024/6/7 下午7:51
|
||||||
|
*/
|
||||||
|
@Component
|
||||||
|
public class KafkaCallback implements SuccessCallback<SendResult<String, Object>> , FailureCallback {
|
||||||
|
@Override
|
||||||
|
public void onSuccess(SendResult<String, Object> success) {
|
||||||
|
// 消息发送到的topic
|
||||||
|
String topic = success.getRecordMetadata().topic();
|
||||||
|
// 消息发送到的分区
|
||||||
|
int partition = success.getRecordMetadata().partition();
|
||||||
|
// 消息在分区内的offset
|
||||||
|
long offset = success.getRecordMetadata().offset();
|
||||||
|
System.out.println("发送消息成功1:" + topic + "-" + partition + "-" + offset);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onFailure(Throwable ex) {
|
||||||
|
System.out.println("发送消息失败1:" + ex.getMessage());
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,59 @@
|
||||||
|
package com.muyu.mqttmessage.config.kafkaconfig;
|
||||||
|
|
||||||
|
import org.apache.kafka.clients.producer.ProducerRecord;
|
||||||
|
import org.apache.kafka.clients.producer.RecordMetadata;
|
||||||
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
|
import org.springframework.context.annotation.Bean;
|
||||||
|
import org.springframework.context.annotation.Configuration;
|
||||||
|
import org.springframework.kafka.core.KafkaTemplate;
|
||||||
|
import org.springframework.kafka.core.ProducerFactory;
|
||||||
|
import org.springframework.kafka.support.ProducerListener;
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @ClassName KafkaConfig
|
||||||
|
* @Description kafka消息监听器
|
||||||
|
* @Author Xin.Yao
|
||||||
|
* @Date 2024/6/7 下午7:55
|
||||||
|
*/
|
||||||
|
@Configuration
|
||||||
|
public class KafkaConfig {
|
||||||
|
|
||||||
|
@Autowired
|
||||||
|
ProducerFactory producerFactory;
|
||||||
|
|
||||||
|
@Bean
|
||||||
|
public KafkaTemplate<String, Object> kafkaTemplate() {
|
||||||
|
KafkaTemplate<String, Object> kafkaTemplate = new KafkaTemplate<String, Object>(producerFactory);
|
||||||
|
kafkaTemplate.setProducerListener(new ProducerListener<String, Object>() {
|
||||||
|
@Override
|
||||||
|
public void onSuccess(ProducerRecord<String, Object> producerRecord, RecordMetadata recordMetadata) {
|
||||||
|
System.out.println("发送成功 " + producerRecord.toString());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onError(ProducerRecord<String, Object> producerRecord, RecordMetadata recordMetadata, Exception exception) {
|
||||||
|
System.out.println("发送失败" + producerRecord.toString());
|
||||||
|
System.out.println(exception.getMessage());
|
||||||
|
}
|
||||||
|
|
||||||
|
// @Override
|
||||||
|
// public void onSuccess(String topic, Integer partition, String key, Object value, RecordMetadata recordMetadata) {
|
||||||
|
// System.out.println("发送成功 topic = " + topic + " ; partion = " + partition + "; key = " + key + " ; value=" + value);
|
||||||
|
// }
|
||||||
|
//
|
||||||
|
// @Override
|
||||||
|
// public void onError(ProducerRecord<String, Object> producerRecord, Exception exception) {
|
||||||
|
// System.out.println("发送失败" + producerRecord.toString());
|
||||||
|
// System.out.println(exception.getMessage());
|
||||||
|
// }
|
||||||
|
//
|
||||||
|
// @Override
|
||||||
|
// public void onError(String topic, Integer partition, String key, Object value, Exception exception) {
|
||||||
|
// System.out.println("发送失败" + "topic = " + topic + " ; partion = " + partition + "; key = " + key + " ; value=" + value);
|
||||||
|
// System.out.println(exception.getMessage());
|
||||||
|
// }
|
||||||
|
});
|
||||||
|
return kafkaTemplate;
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,14 @@
|
||||||
|
package com.muyu.mqttmessage.constants;
|
||||||
|
|
||||||
|
import org.springframework.stereotype.Component;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @ClassName RabbitMqConstant
|
||||||
|
* @Description rabbitmq常量
|
||||||
|
* @Author Xin.Yao
|
||||||
|
* @Date 2024/6/2 上午9:36
|
||||||
|
*/
|
||||||
|
@Component
|
||||||
|
public class RabbitMqConstant {
|
||||||
|
public static final String MQTT_MESSAGE_QUEUE = "mqttmessage";
|
||||||
|
}
|
|
@ -0,0 +1,28 @@
|
||||||
|
package com.muyu.mqttmessage.consumer;
|
||||||
|
|
||||||
|
import com.alibaba.fastjson2.JSON;
|
||||||
|
import com.muyu.mqttmessage.common.MqttMessageModel;
|
||||||
|
import com.muyu.mqttmessage.config.MqttFactory;
|
||||||
|
import com.muyu.mqttmessage.constants.RabbitMqConstant;
|
||||||
|
import lombok.extern.log4j.Log4j2;
|
||||||
|
import org.springframework.amqp.rabbit.annotation.Queue;
|
||||||
|
import org.springframework.amqp.rabbit.annotation.RabbitListener;
|
||||||
|
import org.springframework.stereotype.Component;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @ClassName RabbitConsumer
|
||||||
|
* @Description 描述
|
||||||
|
* @Author Xin.Yao
|
||||||
|
* @Date 2024/6/6 上午9:35
|
||||||
|
*/
|
||||||
|
@Component
|
||||||
|
@Log4j2
|
||||||
|
public class RabbitConsumer {
|
||||||
|
@RabbitListener(queuesToDeclare = {@Queue(RabbitMqConstant.MQTT_MESSAGE_QUEUE)})
|
||||||
|
public void monitorServer(String msg){
|
||||||
|
log.info("监听到的消息:{}",msg);
|
||||||
|
MqttMessageModel mqttMessageModel = JSON.parseObject(msg, MqttMessageModel.class);
|
||||||
|
MqttFactory.createMqttClient(mqttMessageModel);
|
||||||
|
log.info("{}服务器监听连接成功",mqttMessageModel.getTopic());
|
||||||
|
}
|
||||||
|
}
|
|
@ -23,72 +23,58 @@ public class VehicleData {
|
||||||
* VIN
|
* VIN
|
||||||
*/
|
*/
|
||||||
private String vin;
|
private String vin;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 行驶路线
|
* 行驶路线
|
||||||
*/
|
*/
|
||||||
private String drivingRoute;
|
private String drivingRoute;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 经度
|
* 经度
|
||||||
*/
|
*/
|
||||||
private String longitude;
|
private String longitude;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 维度
|
* 维度
|
||||||
*/
|
*/
|
||||||
private String latitude;
|
private String latitude;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 速度
|
* 速度
|
||||||
*/
|
*/
|
||||||
private String speed;
|
private String speed;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 里程
|
* 里程
|
||||||
*/
|
*/
|
||||||
private BigDecimal mileage;
|
private BigDecimal mileage;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 总电压
|
* 总电压
|
||||||
*/
|
*/
|
||||||
private String voltage;
|
private String voltage;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 总电流
|
* 总电流
|
||||||
*/
|
*/
|
||||||
private String current;
|
private String current;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 绝缘电阻
|
* 绝缘电阻
|
||||||
*/
|
*/
|
||||||
private String resistance;
|
private String resistance;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 档位
|
* 档位
|
||||||
*/
|
*/
|
||||||
private String gear = "P";
|
private String gear = "P";
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 加速踏板行程值
|
* 加速踏板行程值
|
||||||
*/
|
*/
|
||||||
private String accelerationPedal;
|
private String accelerationPedal;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 制动踏板行程值
|
* 制动踏板行程值
|
||||||
*/
|
*/
|
||||||
private String brakePedal;
|
private String brakePedal;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 燃料消耗率
|
* 燃料消耗率
|
||||||
*/
|
*/
|
||||||
private String fuelConsumptionRate;
|
private String fuelConsumptionRate;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 电机控制器温度
|
* 电机控制器温度
|
||||||
*/
|
*/
|
||||||
private String motorControllerTemperature;
|
private String motorControllerTemperature;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 电机转速
|
* 电机转速
|
||||||
*/
|
*/
|
||||||
|
|
|
@ -7,6 +7,8 @@ import lombok.extern.log4j.Log4j2;
|
||||||
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
|
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
|
||||||
import org.eclipse.paho.client.mqttv3.MqttCallback;
|
import org.eclipse.paho.client.mqttv3.MqttCallback;
|
||||||
import org.eclipse.paho.client.mqttv3.MqttMessage;
|
import org.eclipse.paho.client.mqttv3.MqttMessage;
|
||||||
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
|
import org.springframework.kafka.core.KafkaTemplate;
|
||||||
import org.springframework.stereotype.Component;
|
import org.springframework.stereotype.Component;
|
||||||
|
|
||||||
import java.math.BigDecimal;
|
import java.math.BigDecimal;
|
||||||
|
@ -20,6 +22,9 @@ import java.math.BigDecimal;
|
||||||
@Component
|
@Component
|
||||||
@Log4j2
|
@Log4j2
|
||||||
public class MqttCallBackServiceImpl implements MqttCallback {
|
public class MqttCallBackServiceImpl implements MqttCallback {
|
||||||
|
@Autowired
|
||||||
|
private KafkaTemplate<String,Object> kafkaTemplate;
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void connectionLost(Throwable cause) {
|
public void connectionLost(Throwable cause) {
|
||||||
System.out.println("connectionLost: " + cause.getMessage());
|
System.out.println("connectionLost: " + cause.getMessage());
|
||||||
|
@ -29,6 +34,7 @@ public class MqttCallBackServiceImpl implements MqttCallback {
|
||||||
public void messageArrived(String topic, MqttMessage message) {
|
public void messageArrived(String topic, MqttMessage message) {
|
||||||
log.info("服务器{}监听的报文: {}" ,topic, ConversionUtil.hexStringToString(new String(message.getPayload())));
|
log.info("服务器{}监听的报文: {}" ,topic, ConversionUtil.hexStringToString(new String(message.getPayload())));
|
||||||
log.info("转化对象:{}", JSON.toJSONString(getVehicleData(ConversionUtil.hexStringToString(new String(message.getPayload())))));
|
log.info("转化对象:{}", JSON.toJSONString(getVehicleData(ConversionUtil.hexStringToString(new String(message.getPayload())))));
|
||||||
|
kafkaTemplate.send("testKafka",getVehicleData(ConversionUtil.hexStringToString(new String(message.getPayload()))).toString());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -15,6 +15,44 @@ spring:
|
||||||
prefetch: 1 # 每次只能获取一条,处理完成才能获取下一条
|
prefetch: 1 # 每次只能获取一条,处理完成才能获取下一条
|
||||||
publisher-confirm-type: correlated #确认消息已发送到交换机(Exchange)
|
publisher-confirm-type: correlated #确认消息已发送到交换机(Exchange)
|
||||||
publisher-returns: true #确认消息已发送到队列(Queue)
|
publisher-returns: true #确认消息已发送到队列(Queue)
|
||||||
|
kafka:
|
||||||
|
bootstrap-servers: 47.98.170.220:9092 #这个是kafka的地址,对应你server.properties中配置的
|
||||||
|
producer:
|
||||||
|
batch-size: 16384 #批量大小
|
||||||
|
acks: -1 #应答级别:多少个分区副本备份完成时向生产者发送ack确认(可选0、1、all/-1)
|
||||||
|
retries: 10 # 消息发送重试次数
|
||||||
|
#transaction-id-prefix: transaction
|
||||||
|
buffer-memory: 33554432
|
||||||
|
key-serializer: org.apache.kafka.common.serialization.StringSerializer
|
||||||
|
value-serializer: org.apache.kafka.common.serialization.StringSerializer
|
||||||
|
properties:
|
||||||
|
partitioner:
|
||||||
|
class: com.muyu.mqttmessage.config.kafkaconfig.CustomizePartitioner
|
||||||
|
linger:
|
||||||
|
ms: 2000 #提交延迟
|
||||||
|
#partitioner: #指定分区器
|
||||||
|
#class: pers.zhang.config.CustomerPartitionHandler
|
||||||
|
consumer:
|
||||||
|
group-id: testGroup #默认的消费组ID
|
||||||
|
enable-auto-commit: true #是否自动提交offset
|
||||||
|
auto-commit-interval: 2000 #提交offset延时
|
||||||
|
# 当kafka中没有初始offset或offset超出范围时将自动重置offset
|
||||||
|
# earliest:重置为分区中最小的offset;
|
||||||
|
# latest:重置为分区中最新的offset(消费分区中新产生的数据);
|
||||||
|
# none:只要有一个分区不存在已提交的offset,就抛出异常;
|
||||||
|
auto-offset-reset: latest
|
||||||
|
max-poll-records: 500 #单次拉取消息的最大条数
|
||||||
|
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
|
||||||
|
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
|
||||||
|
properties:
|
||||||
|
session:
|
||||||
|
timeout:
|
||||||
|
ms: 120000 # 消费会话超时时间(超过这个时间 consumer 没有发送心跳,就会触发 rebalance 操作)
|
||||||
|
request:
|
||||||
|
timeout:
|
||||||
|
ms: 18000 # 消费请求的超时时间
|
||||||
|
listener:
|
||||||
|
missing-topics-fatal: false # consumer listener topics 不存在时,启动项目就会报错
|
||||||
|
# type: batch
|
||||||
server:
|
server:
|
||||||
port: 9005
|
port: 9005
|
||||||
|
|
Loading…
Reference in New Issue