diff --git a/src/main/java/com/muyu/mqttmessage/config/MqttFactory.java b/src/main/java/com/muyu/mqttmessage/config/MqttFactory.java index 38ed4e0..225742c 100644 --- a/src/main/java/com/muyu/mqttmessage/config/MqttFactory.java +++ b/src/main/java/com/muyu/mqttmessage/config/MqttFactory.java @@ -5,6 +5,8 @@ import com.muyu.mqttmessage.service.impl.MqttCallBackServiceImpl; import lombok.AllArgsConstructor; import org.eclipse.paho.client.mqttv3.*; import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.kafka.core.KafkaTemplate; import org.springframework.stereotype.Service; /** @@ -16,7 +18,12 @@ import org.springframework.stereotype.Service; @Service @AllArgsConstructor public class MqttFactory { - +// @Autowired +// private static KafkaTemplate kafkaTemplate; +// +// public static void main(String[] args) { +// kafkaTemplate.send("testKafka","测试"); +// } private static MqttCallBackServiceImpl mqttCallBackService; // public static void main(String[] args){ // String broker = "tcp://43.142.44.217:1883"; diff --git a/src/main/java/com/muyu/mqttmessage/config/kafkaconfig/CustomizePartitioner.java b/src/main/java/com/muyu/mqttmessage/config/kafkaconfig/CustomizePartitioner.java index ca027c8..2eef41c 100644 --- a/src/main/java/com/muyu/mqttmessage/config/kafkaconfig/CustomizePartitioner.java +++ b/src/main/java/com/muyu/mqttmessage/config/kafkaconfig/CustomizePartitioner.java @@ -12,21 +12,21 @@ import java.util.Map; * @Author Xin.Yao * @Date 2024/6/7 下午8:05 */ -@Component -public class CustomizePartitioner implements Partitioner { - @Override - public int partition(String s, Object o, byte[] bytes, Object o1, byte[] bytes1, Cluster cluster) { - //自定义分区规则,默认全部发送到0号分区 - return 0; - } - - @Override - public void close() { - - } - - @Override - public void configure(Map map) { - - } -} +//@Component +//public class CustomizePartitioner implements Partitioner { +// @Override +// public int partition(String s, Object o, byte[] bytes, Object o1, byte[] bytes1, Cluster cluster) { +// //自定义分区规则,默认全部发送到0号分区 +// return 0; +// } +// +// @Override +// public void close() { +// +// } +// +// @Override +// public void configure(Map map) { +// +// } +//} diff --git a/src/main/java/com/muyu/mqttmessage/config/kafkaconfig/KafkaCallback.java b/src/main/java/com/muyu/mqttmessage/config/kafkaconfig/KafkaCallback.java index 0624064..31321df 100644 --- a/src/main/java/com/muyu/mqttmessage/config/kafkaconfig/KafkaCallback.java +++ b/src/main/java/com/muyu/mqttmessage/config/kafkaconfig/KafkaCallback.java @@ -11,21 +11,21 @@ import org.springframework.util.concurrent.SuccessCallback; * @Author Xin.Yao * @Date 2024/6/7 下午7:51 */ -@Component -public class KafkaCallback implements SuccessCallback> , FailureCallback { - @Override - public void onSuccess(SendResult success) { - // 消息发送到的topic - String topic = success.getRecordMetadata().topic(); - // 消息发送到的分区 - int partition = success.getRecordMetadata().partition(); - // 消息在分区内的offset - long offset = success.getRecordMetadata().offset(); - System.out.println("发送消息成功1:" + topic + "-" + partition + "-" + offset); - } - - @Override - public void onFailure(Throwable ex) { - System.out.println("发送消息失败1:" + ex.getMessage()); - } -} +//@Component +//public class KafkaCallback implements SuccessCallback> , FailureCallback { +// @Override +// public void onSuccess(SendResult success) { +// // 消息发送到的topic +// String topic = success.getRecordMetadata().topic(); +// // 消息发送到的分区 +// int partition = success.getRecordMetadata().partition(); +// // 消息在分区内的offset +// long offset = success.getRecordMetadata().offset(); +// System.out.println("发送消息成功1:" + topic + "-" + partition + "-" + offset); +// } +// +// @Override +// public void onFailure(Throwable ex) { +// System.out.println("发送消息失败1:" + ex.getMessage()); +// } +//} diff --git a/src/main/java/com/muyu/mqttmessage/config/kafkaconfig/KafkaConfig.java b/src/main/java/com/muyu/mqttmessage/config/kafkaconfig/KafkaConfig.java index 34d86f3..d6c2bce 100644 --- a/src/main/java/com/muyu/mqttmessage/config/kafkaconfig/KafkaConfig.java +++ b/src/main/java/com/muyu/mqttmessage/config/kafkaconfig/KafkaConfig.java @@ -16,44 +16,44 @@ import org.springframework.kafka.support.ProducerListener; * @Author Xin.Yao * @Date 2024/6/7 下午7:55 */ -@Configuration -public class KafkaConfig { - - @Autowired - ProducerFactory producerFactory; - - @Bean - public KafkaTemplate kafkaTemplate() { - KafkaTemplate kafkaTemplate = new KafkaTemplate(producerFactory); - kafkaTemplate.setProducerListener(new ProducerListener() { - @Override - public void onSuccess(ProducerRecord producerRecord, RecordMetadata recordMetadata) { - System.out.println("发送成功 " + producerRecord.toString()); - } - - @Override - public void onError(ProducerRecord producerRecord, RecordMetadata recordMetadata, Exception exception) { - System.out.println("发送失败" + producerRecord.toString()); - System.out.println(exception.getMessage()); - } - +//@Configuration +//public class KafkaConfig { +// +// @Autowired +// ProducerFactory producerFactory; +// +// @Bean +// public KafkaTemplate kafkaTemplate() { +// KafkaTemplate kafkaTemplate = new KafkaTemplate(producerFactory); +// kafkaTemplate.setProducerListener(new ProducerListener() { // @Override -// public void onSuccess(String topic, Integer partition, String key, Object value, RecordMetadata recordMetadata) { -// System.out.println("发送成功 topic = " + topic + " ; partion = " + partition + "; key = " + key + " ; value=" + value); +// public void onSuccess(ProducerRecord producerRecord, RecordMetadata recordMetadata) { +// System.out.println("发送成功 " + producerRecord.toString()); // } // // @Override -// public void onError(ProducerRecord producerRecord, Exception exception) { +// public void onError(ProducerRecord producerRecord, RecordMetadata recordMetadata, Exception exception) { // System.out.println("发送失败" + producerRecord.toString()); // System.out.println(exception.getMessage()); // } // -// @Override -// public void onError(String topic, Integer partition, String key, Object value, Exception exception) { -// System.out.println("发送失败" + "topic = " + topic + " ; partion = " + partition + "; key = " + key + " ; value=" + value); -// System.out.println(exception.getMessage()); -// } - }); - return kafkaTemplate; - } -} +//// @Override +//// public void onSuccess(String topic, Integer partition, String key, Object value, RecordMetadata recordMetadata) { +//// System.out.println("发送成功 topic = " + topic + " ; partion = " + partition + "; key = " + key + " ; value=" + value); +//// } +//// +//// @Override +//// public void onError(ProducerRecord producerRecord, Exception exception) { +//// System.out.println("发送失败" + producerRecord.toString()); +//// System.out.println(exception.getMessage()); +//// } +//// +//// @Override +//// public void onError(String topic, Integer partition, String key, Object value, Exception exception) { +//// System.out.println("发送失败" + "topic = " + topic + " ; partion = " + partition + "; key = " + key + " ; value=" + value); +//// System.out.println(exception.getMessage()); +//// } +// }); +// return kafkaTemplate; +// } +//} diff --git a/src/main/java/com/muyu/mqttmessage/consumer/RabbitConsumer.java b/src/main/java/com/muyu/mqttmessage/consumer/RabbitConsumer.java index 9b2da1c..4e77091 100644 --- a/src/main/java/com/muyu/mqttmessage/consumer/RabbitConsumer.java +++ b/src/main/java/com/muyu/mqttmessage/consumer/RabbitConsumer.java @@ -5,6 +5,7 @@ import com.muyu.mqttmessage.common.MqttMessageModel; import com.muyu.mqttmessage.config.MqttFactory; import com.muyu.mqttmessage.constants.RabbitMqConstant; import lombok.extern.log4j.Log4j2; +import org.eclipse.paho.client.mqttv3.MqttClient; import org.springframework.amqp.rabbit.annotation.Queue; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; @@ -22,7 +23,8 @@ public class RabbitConsumer { public void monitorServer(String msg){ log.info("监听到的消息:{}",msg); MqttMessageModel mqttMessageModel = JSON.parseObject(msg, MqttMessageModel.class); - MqttFactory.createMqttClient(mqttMessageModel); + MqttClient mqttClient = MqttFactory.createMqttClient(mqttMessageModel); log.info("{}服务器监听连接成功",mqttMessageModel.getTopic()); } + } diff --git a/src/main/java/com/muyu/mqttmessage/service/impl/MqttCallBackServiceImpl.java b/src/main/java/com/muyu/mqttmessage/service/impl/MqttCallBackServiceImpl.java index efa01c8..2ab1996 100644 --- a/src/main/java/com/muyu/mqttmessage/service/impl/MqttCallBackServiceImpl.java +++ b/src/main/java/com/muyu/mqttmessage/service/impl/MqttCallBackServiceImpl.java @@ -22,8 +22,8 @@ import java.math.BigDecimal; @Component @Log4j2 public class MqttCallBackServiceImpl implements MqttCallback { - @Autowired - private KafkaTemplate kafkaTemplate; +// @Autowired +// private KafkaTemplate kafkaTemplate; @Override public void connectionLost(Throwable cause) { @@ -34,7 +34,7 @@ public class MqttCallBackServiceImpl implements MqttCallback { public void messageArrived(String topic, MqttMessage message) { log.info("服务器{}监听的报文: {}" ,topic, ConversionUtil.hexStringToString(new String(message.getPayload()))); log.info("转化对象:{}", JSON.toJSONString(getVehicleData(ConversionUtil.hexStringToString(new String(message.getPayload()))))); - kafkaTemplate.send("testKafka",getVehicleData(ConversionUtil.hexStringToString(new String(message.getPayload()))).toString()); +// kafkaTemplate.send("testKafka",getVehicleData(ConversionUtil.hexStringToString(new String(message.getPayload()))).toString()); } @Override @@ -45,7 +45,7 @@ public class MqttCallBackServiceImpl implements MqttCallback { public VehicleData getVehicleData(String message) { message = message.substring(1,message.length()-2); return VehicleData.builder() - //17 + //vin .vin(message.substring(0,17)) // 当前时间戳 13 .drivingRoute(message.substring(17,30))