diff --git a/pom.xml b/pom.xml index a26f366..86bd087 100644 --- a/pom.xml +++ b/pom.xml @@ -17,6 +17,11 @@ 17 + + org.springframework.kafka + spring-kafka + + org.eclipse.paho org.eclipse.paho.client.mqttv3 @@ -44,10 +49,7 @@ org.springframework.boot spring-boot-starter-amqp - - org.springframework.kafka - spring-kafka - + org.projectlombok @@ -64,11 +66,6 @@ spring-rabbit-test test - - org.springframework.kafka - spring-kafka-test - test - diff --git a/src/main/java/com/muyu/mqttmessage/MqttMessageApplication.java b/src/main/java/com/muyu/mqttmessage/MqttMessageApplication.java index 14851ce..65c9bd0 100644 --- a/src/main/java/com/muyu/mqttmessage/MqttMessageApplication.java +++ b/src/main/java/com/muyu/mqttmessage/MqttMessageApplication.java @@ -2,6 +2,8 @@ package com.muyu.mqttmessage; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.context.annotation.Bean; +import org.springframework.kafka.core.KafkaTemplate; @SpringBootApplication public class MqttMessageApplication { @@ -10,4 +12,5 @@ public class MqttMessageApplication { SpringApplication.run(MqttMessageApplication.class, args); } + } diff --git a/src/main/java/com/muyu/mqttmessage/config/MqttFactory.java b/src/main/java/com/muyu/mqttmessage/config/MqttFactory.java index a29ca23..38ed4e0 100644 --- a/src/main/java/com/muyu/mqttmessage/config/MqttFactory.java +++ b/src/main/java/com/muyu/mqttmessage/config/MqttFactory.java @@ -60,17 +60,17 @@ public class MqttFactory { // } // } - public static void main(String[] args) { - MqttMessageModel mqttMessageModel1 = MqttMessageModel.builderMqttMessage("tcp://43.142.44.217:1883", "mqtt001","1111","22222"); - MqttFactory.createMqttClient(mqttMessageModel1); - MqttMessageModel mqttMessageModel2 = MqttMessageModel.builderMqttMessage("tcp://47.98.170.220:1883", "mqtt002","1111","22222"); - MqttFactory.createMqttClient(mqttMessageModel2); - } +// public static void main(String[] args) { +// MqttMessageModel mqttMessageModel1 = MqttMessageModel.builderMqttMessage("tcp://43.142.44.217:1883", "mqtt001","1111","22222"); +// MqttFactory.createMqttClient(mqttMessageModel1); +// MqttMessageModel mqttMessageModel2 = MqttMessageModel.builderMqttMessage("tcp://47.98.170.220:1883", "mqtt002","1111","22222"); +// MqttFactory.createMqttClient(mqttMessageModel2); +// } public static MqttClient createMqttClient(MqttMessageModel mqttMessageModel) { MqttClient client =null; int qos = 0; try { - client = new MqttClient(mqttMessageModel.getBroker(), mqttMessageModel.getClientId(), new MemoryPersistence()); + client = new MqttClient("tcp://"+mqttMessageModel.getBroker()+":1883", mqttMessageModel.getClientId(), new MemoryPersistence()); // 连接参数 MqttConnectOptions options = new MqttConnectOptions(); options.setUserName(mqttMessageModel.getUsername()); diff --git a/src/main/java/com/muyu/mqttmessage/config/kafkaconfig/CustomizePartitioner.java b/src/main/java/com/muyu/mqttmessage/config/kafkaconfig/CustomizePartitioner.java new file mode 100644 index 0000000..ca027c8 --- /dev/null +++ b/src/main/java/com/muyu/mqttmessage/config/kafkaconfig/CustomizePartitioner.java @@ -0,0 +1,32 @@ +package com.muyu.mqttmessage.config.kafkaconfig; + +import org.apache.kafka.clients.producer.Partitioner; +import org.apache.kafka.common.Cluster; +import org.springframework.stereotype.Component; + +import java.util.Map; + +/** + * @ClassName CustomizePartitioner + * @Description kafka自定义分区 + * @Author Xin.Yao + * @Date 2024/6/7 下午8:05 + */ +@Component +public class CustomizePartitioner implements Partitioner { + @Override + public int partition(String s, Object o, byte[] bytes, Object o1, byte[] bytes1, Cluster cluster) { + //自定义分区规则,默认全部发送到0号分区 + return 0; + } + + @Override + public void close() { + + } + + @Override + public void configure(Map map) { + + } +} diff --git a/src/main/java/com/muyu/mqttmessage/config/kafkaconfig/KafkaCallback.java b/src/main/java/com/muyu/mqttmessage/config/kafkaconfig/KafkaCallback.java new file mode 100644 index 0000000..0624064 --- /dev/null +++ b/src/main/java/com/muyu/mqttmessage/config/kafkaconfig/KafkaCallback.java @@ -0,0 +1,31 @@ +package com.muyu.mqttmessage.config.kafkaconfig; + +import org.springframework.kafka.support.SendResult; +import org.springframework.stereotype.Component; +import org.springframework.util.concurrent.FailureCallback; +import org.springframework.util.concurrent.SuccessCallback; + +/** + * @ClassName SuccessCallback + * @Description kafka消息发送成功回调 + * @Author Xin.Yao + * @Date 2024/6/7 下午7:51 + */ +@Component +public class KafkaCallback implements SuccessCallback> , FailureCallback { + @Override + public void onSuccess(SendResult success) { + // 消息发送到的topic + String topic = success.getRecordMetadata().topic(); + // 消息发送到的分区 + int partition = success.getRecordMetadata().partition(); + // 消息在分区内的offset + long offset = success.getRecordMetadata().offset(); + System.out.println("发送消息成功1:" + topic + "-" + partition + "-" + offset); + } + + @Override + public void onFailure(Throwable ex) { + System.out.println("发送消息失败1:" + ex.getMessage()); + } +} diff --git a/src/main/java/com/muyu/mqttmessage/config/kafkaconfig/KafkaConfig.java b/src/main/java/com/muyu/mqttmessage/config/kafkaconfig/KafkaConfig.java new file mode 100644 index 0000000..34d86f3 --- /dev/null +++ b/src/main/java/com/muyu/mqttmessage/config/kafkaconfig/KafkaConfig.java @@ -0,0 +1,59 @@ +package com.muyu.mqttmessage.config.kafkaconfig; + +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.clients.producer.RecordMetadata; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.kafka.core.ProducerFactory; +import org.springframework.kafka.support.ProducerListener; + + +/** + * @ClassName KafkaConfig + * @Description kafka消息监听器 + * @Author Xin.Yao + * @Date 2024/6/7 下午7:55 + */ +@Configuration +public class KafkaConfig { + + @Autowired + ProducerFactory producerFactory; + + @Bean + public KafkaTemplate kafkaTemplate() { + KafkaTemplate kafkaTemplate = new KafkaTemplate(producerFactory); + kafkaTemplate.setProducerListener(new ProducerListener() { + @Override + public void onSuccess(ProducerRecord producerRecord, RecordMetadata recordMetadata) { + System.out.println("发送成功 " + producerRecord.toString()); + } + + @Override + public void onError(ProducerRecord producerRecord, RecordMetadata recordMetadata, Exception exception) { + System.out.println("发送失败" + producerRecord.toString()); + System.out.println(exception.getMessage()); + } + +// @Override +// public void onSuccess(String topic, Integer partition, String key, Object value, RecordMetadata recordMetadata) { +// System.out.println("发送成功 topic = " + topic + " ; partion = " + partition + "; key = " + key + " ; value=" + value); +// } +// +// @Override +// public void onError(ProducerRecord producerRecord, Exception exception) { +// System.out.println("发送失败" + producerRecord.toString()); +// System.out.println(exception.getMessage()); +// } +// +// @Override +// public void onError(String topic, Integer partition, String key, Object value, Exception exception) { +// System.out.println("发送失败" + "topic = " + topic + " ; partion = " + partition + "; key = " + key + " ; value=" + value); +// System.out.println(exception.getMessage()); +// } + }); + return kafkaTemplate; + } +} diff --git a/src/main/java/com/muyu/mqttmessage/constants/RabbitMqConstant.java b/src/main/java/com/muyu/mqttmessage/constants/RabbitMqConstant.java new file mode 100644 index 0000000..129e474 --- /dev/null +++ b/src/main/java/com/muyu/mqttmessage/constants/RabbitMqConstant.java @@ -0,0 +1,14 @@ +package com.muyu.mqttmessage.constants; + +import org.springframework.stereotype.Component; + +/** + * @ClassName RabbitMqConstant + * @Description rabbitmq常量 + * @Author Xin.Yao + * @Date 2024/6/2 上午9:36 + */ +@Component +public class RabbitMqConstant { + public static final String MQTT_MESSAGE_QUEUE = "mqttmessage"; +} diff --git a/src/main/java/com/muyu/mqttmessage/consumer/RabbitConsumer.java b/src/main/java/com/muyu/mqttmessage/consumer/RabbitConsumer.java new file mode 100644 index 0000000..9b2da1c --- /dev/null +++ b/src/main/java/com/muyu/mqttmessage/consumer/RabbitConsumer.java @@ -0,0 +1,28 @@ +package com.muyu.mqttmessage.consumer; + +import com.alibaba.fastjson2.JSON; +import com.muyu.mqttmessage.common.MqttMessageModel; +import com.muyu.mqttmessage.config.MqttFactory; +import com.muyu.mqttmessage.constants.RabbitMqConstant; +import lombok.extern.log4j.Log4j2; +import org.springframework.amqp.rabbit.annotation.Queue; +import org.springframework.amqp.rabbit.annotation.RabbitListener; +import org.springframework.stereotype.Component; + +/** + * @ClassName RabbitConsumer + * @Description 描述 + * @Author Xin.Yao + * @Date 2024/6/6 上午9:35 + */ +@Component +@Log4j2 +public class RabbitConsumer { + @RabbitListener(queuesToDeclare = {@Queue(RabbitMqConstant.MQTT_MESSAGE_QUEUE)}) + public void monitorServer(String msg){ + log.info("监听到的消息:{}",msg); + MqttMessageModel mqttMessageModel = JSON.parseObject(msg, MqttMessageModel.class); + MqttFactory.createMqttClient(mqttMessageModel); + log.info("{}服务器监听连接成功",mqttMessageModel.getTopic()); + } +} diff --git a/src/main/java/com/muyu/mqttmessage/domain/VehicleData.java b/src/main/java/com/muyu/mqttmessage/domain/VehicleData.java index 5e4a7bd..284e52d 100644 --- a/src/main/java/com/muyu/mqttmessage/domain/VehicleData.java +++ b/src/main/java/com/muyu/mqttmessage/domain/VehicleData.java @@ -23,72 +23,58 @@ public class VehicleData { * VIN */ private String vin; - /** * 行驶路线 */ private String drivingRoute; - /** * 经度 */ private String longitude; - /** * 维度 */ private String latitude; - /** * 速度 */ private String speed; - /** * 里程 */ private BigDecimal mileage; - /** * 总电压 */ private String voltage; - /** * 总电流 */ private String current; - /** * 绝缘电阻 */ private String resistance; - /** * 档位 */ private String gear = "P"; - /** * 加速踏板行程值 */ private String accelerationPedal; - /** * 制动踏板行程值 */ private String brakePedal; - /** * 燃料消耗率 */ private String fuelConsumptionRate; - /** * 电机控制器温度 */ private String motorControllerTemperature; - /** * 电机转速 */ diff --git a/src/main/java/com/muyu/mqttmessage/service/impl/MqttCallBackServiceImpl.java b/src/main/java/com/muyu/mqttmessage/service/impl/MqttCallBackServiceImpl.java index 9ed391c..efa01c8 100644 --- a/src/main/java/com/muyu/mqttmessage/service/impl/MqttCallBackServiceImpl.java +++ b/src/main/java/com/muyu/mqttmessage/service/impl/MqttCallBackServiceImpl.java @@ -7,6 +7,8 @@ import lombok.extern.log4j.Log4j2; import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; import org.eclipse.paho.client.mqttv3.MqttCallback; import org.eclipse.paho.client.mqttv3.MqttMessage; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.kafka.core.KafkaTemplate; import org.springframework.stereotype.Component; import java.math.BigDecimal; @@ -20,6 +22,9 @@ import java.math.BigDecimal; @Component @Log4j2 public class MqttCallBackServiceImpl implements MqttCallback { + @Autowired + private KafkaTemplate kafkaTemplate; + @Override public void connectionLost(Throwable cause) { System.out.println("connectionLost: " + cause.getMessage()); @@ -29,6 +34,7 @@ public class MqttCallBackServiceImpl implements MqttCallback { public void messageArrived(String topic, MqttMessage message) { log.info("服务器{}监听的报文: {}" ,topic, ConversionUtil.hexStringToString(new String(message.getPayload()))); log.info("转化对象:{}", JSON.toJSONString(getVehicleData(ConversionUtil.hexStringToString(new String(message.getPayload()))))); + kafkaTemplate.send("testKafka",getVehicleData(ConversionUtil.hexStringToString(new String(message.getPayload()))).toString()); } @Override diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index b456e74..d9888ba 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -15,6 +15,44 @@ spring: prefetch: 1 # 每次只能获取一条,处理完成才能获取下一条 publisher-confirm-type: correlated #确认消息已发送到交换机(Exchange) publisher-returns: true #确认消息已发送到队列(Queue) - + kafka: + bootstrap-servers: 47.98.170.220:9092 #这个是kafka的地址,对应你server.properties中配置的 + producer: + batch-size: 16384 #批量大小 + acks: -1 #应答级别:多少个分区副本备份完成时向生产者发送ack确认(可选0、1、all/-1) + retries: 10 # 消息发送重试次数 + #transaction-id-prefix: transaction + buffer-memory: 33554432 + key-serializer: org.apache.kafka.common.serialization.StringSerializer + value-serializer: org.apache.kafka.common.serialization.StringSerializer + properties: + partitioner: + class: com.muyu.mqttmessage.config.kafkaconfig.CustomizePartitioner + linger: + ms: 2000 #提交延迟 + #partitioner: #指定分区器 + #class: pers.zhang.config.CustomerPartitionHandler + consumer: + group-id: testGroup #默认的消费组ID + enable-auto-commit: true #是否自动提交offset + auto-commit-interval: 2000 #提交offset延时 + # 当kafka中没有初始offset或offset超出范围时将自动重置offset + # earliest:重置为分区中最小的offset; + # latest:重置为分区中最新的offset(消费分区中新产生的数据); + # none:只要有一个分区不存在已提交的offset,就抛出异常; + auto-offset-reset: latest + max-poll-records: 500 #单次拉取消息的最大条数 + key-deserializer: org.apache.kafka.common.serialization.StringDeserializer + value-deserializer: org.apache.kafka.common.serialization.StringDeserializer + properties: + session: + timeout: + ms: 120000 # 消费会话超时时间(超过这个时间 consumer 没有发送心跳,就会触发 rebalance 操作) + request: + timeout: + ms: 18000 # 消费请求的超时时间 + listener: + missing-topics-fatal: false # consumer listener topics 不存在时,启动项目就会报错 + # type: batch server: port: 9005