feat: 测试kafka使用

master
yaoxin 2024-06-08 11:59:23 +08:00
parent ece9b8a42e
commit d635bca11e
6 changed files with 84 additions and 75 deletions

View File

@ -5,6 +5,8 @@ import com.muyu.mqttmessage.service.impl.MqttCallBackServiceImpl;
import lombok.AllArgsConstructor; import lombok.AllArgsConstructor;
import org.eclipse.paho.client.mqttv3.*; import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
/** /**
@ -16,7 +18,12 @@ import org.springframework.stereotype.Service;
@Service @Service
@AllArgsConstructor @AllArgsConstructor
public class MqttFactory { public class MqttFactory {
// @Autowired
// private static KafkaTemplate<String,Object> kafkaTemplate;
//
// public static void main(String[] args) {
// kafkaTemplate.send("testKafka","测试");
// }
private static MqttCallBackServiceImpl mqttCallBackService; private static MqttCallBackServiceImpl mqttCallBackService;
// public static void main(String[] args){ // public static void main(String[] args){
// String broker = "tcp://43.142.44.217:1883"; // String broker = "tcp://43.142.44.217:1883";

View File

@ -12,21 +12,21 @@ import java.util.Map;
* @Author Xin.Yao * @Author Xin.Yao
* @Date 2024/6/7 8:05 * @Date 2024/6/7 8:05
*/ */
@Component //@Component
public class CustomizePartitioner implements Partitioner { //public class CustomizePartitioner implements Partitioner {
@Override // @Override
public int partition(String s, Object o, byte[] bytes, Object o1, byte[] bytes1, Cluster cluster) { // public int partition(String s, Object o, byte[] bytes, Object o1, byte[] bytes1, Cluster cluster) {
//自定义分区规则默认全部发送到0号分区 // //自定义分区规则默认全部发送到0号分区
return 0; // return 0;
} // }
//
@Override // @Override
public void close() { // public void close() {
//
} // }
//
@Override // @Override
public void configure(Map<String, ?> map) { // public void configure(Map<String, ?> map) {
//
} // }
} //}

View File

@ -11,21 +11,21 @@ import org.springframework.util.concurrent.SuccessCallback;
* @Author Xin.Yao * @Author Xin.Yao
* @Date 2024/6/7 7:51 * @Date 2024/6/7 7:51
*/ */
@Component //@Component
public class KafkaCallback implements SuccessCallback<SendResult<String, Object>> , FailureCallback { //public class KafkaCallback implements SuccessCallback<SendResult<String, Object>> , FailureCallback {
@Override // @Override
public void onSuccess(SendResult<String, Object> success) { // public void onSuccess(SendResult<String, Object> success) {
// 消息发送到的topic // // 消息发送到的topic
String topic = success.getRecordMetadata().topic(); // String topic = success.getRecordMetadata().topic();
// 消息发送到的分区 // // 消息发送到的分区
int partition = success.getRecordMetadata().partition(); // int partition = success.getRecordMetadata().partition();
// 消息在分区内的offset // // 消息在分区内的offset
long offset = success.getRecordMetadata().offset(); // long offset = success.getRecordMetadata().offset();
System.out.println("发送消息成功1:" + topic + "-" + partition + "-" + offset); // System.out.println("发送消息成功1:" + topic + "-" + partition + "-" + offset);
} // }
//
@Override // @Override
public void onFailure(Throwable ex) { // public void onFailure(Throwable ex) {
System.out.println("发送消息失败1:" + ex.getMessage()); // System.out.println("发送消息失败1:" + ex.getMessage());
} // }
} //}

View File

@ -16,44 +16,44 @@ import org.springframework.kafka.support.ProducerListener;
* @Author Xin.Yao * @Author Xin.Yao
* @Date 2024/6/7 7:55 * @Date 2024/6/7 7:55
*/ */
@Configuration //@Configuration
public class KafkaConfig { //public class KafkaConfig {
//
@Autowired // @Autowired
ProducerFactory producerFactory; // ProducerFactory producerFactory;
//
@Bean // @Bean
public KafkaTemplate<String, Object> kafkaTemplate() { // public KafkaTemplate<String, Object> kafkaTemplate() {
KafkaTemplate<String, Object> kafkaTemplate = new KafkaTemplate<String, Object>(producerFactory); // KafkaTemplate<String, Object> kafkaTemplate = new KafkaTemplate<String, Object>(producerFactory);
kafkaTemplate.setProducerListener(new ProducerListener<String, Object>() { // kafkaTemplate.setProducerListener(new ProducerListener<String, Object>() {
@Override
public void onSuccess(ProducerRecord<String, Object> producerRecord, RecordMetadata recordMetadata) {
System.out.println("发送成功 " + producerRecord.toString());
}
@Override
public void onError(ProducerRecord<String, Object> producerRecord, RecordMetadata recordMetadata, Exception exception) {
System.out.println("发送失败" + producerRecord.toString());
System.out.println(exception.getMessage());
}
// @Override // @Override
// public void onSuccess(String topic, Integer partition, String key, Object value, RecordMetadata recordMetadata) { // public void onSuccess(ProducerRecord<String, Object> producerRecord, RecordMetadata recordMetadata) {
// System.out.println("发送成功 topic = " + topic + " ; partion = " + partition + "; key = " + key + " ; value=" + value); // System.out.println("发送成功 " + producerRecord.toString());
// } // }
// //
// @Override // @Override
// public void onError(ProducerRecord<String, Object> producerRecord, Exception exception) { // public void onError(ProducerRecord<String, Object> producerRecord, RecordMetadata recordMetadata, Exception exception) {
// System.out.println("发送失败" + producerRecord.toString()); // System.out.println("发送失败" + producerRecord.toString());
// System.out.println(exception.getMessage()); // System.out.println(exception.getMessage());
// } // }
// //
// @Override //// @Override
// public void onError(String topic, Integer partition, String key, Object value, Exception exception) { //// public void onSuccess(String topic, Integer partition, String key, Object value, RecordMetadata recordMetadata) {
// System.out.println("发送失败" + "topic = " + topic + " ; partion = " + partition + "; key = " + key + " ; value=" + value); //// System.out.println("发送成功 topic = " + topic + " ; partion = " + partition + "; key = " + key + " ; value=" + value);
// System.out.println(exception.getMessage()); //// }
// } ////
}); //// @Override
return kafkaTemplate; //// public void onError(ProducerRecord<String, Object> producerRecord, Exception exception) {
} //// System.out.println("发送失败" + producerRecord.toString());
} //// System.out.println(exception.getMessage());
//// }
////
//// @Override
//// public void onError(String topic, Integer partition, String key, Object value, Exception exception) {
//// System.out.println("发送失败" + "topic = " + topic + " ; partion = " + partition + "; key = " + key + " ; value=" + value);
//// System.out.println(exception.getMessage());
//// }
// });
// return kafkaTemplate;
// }
//}

View File

@ -5,6 +5,7 @@ import com.muyu.mqttmessage.common.MqttMessageModel;
import com.muyu.mqttmessage.config.MqttFactory; import com.muyu.mqttmessage.config.MqttFactory;
import com.muyu.mqttmessage.constants.RabbitMqConstant; import com.muyu.mqttmessage.constants.RabbitMqConstant;
import lombok.extern.log4j.Log4j2; import lombok.extern.log4j.Log4j2;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.springframework.amqp.rabbit.annotation.Queue; import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
@ -22,7 +23,8 @@ public class RabbitConsumer {
public void monitorServer(String msg){ public void monitorServer(String msg){
log.info("监听到的消息:{}",msg); log.info("监听到的消息:{}",msg);
MqttMessageModel mqttMessageModel = JSON.parseObject(msg, MqttMessageModel.class); MqttMessageModel mqttMessageModel = JSON.parseObject(msg, MqttMessageModel.class);
MqttFactory.createMqttClient(mqttMessageModel); MqttClient mqttClient = MqttFactory.createMqttClient(mqttMessageModel);
log.info("{}服务器监听连接成功",mqttMessageModel.getTopic()); log.info("{}服务器监听连接成功",mqttMessageModel.getTopic());
} }
} }

View File

@ -22,8 +22,8 @@ import java.math.BigDecimal;
@Component @Component
@Log4j2 @Log4j2
public class MqttCallBackServiceImpl implements MqttCallback { public class MqttCallBackServiceImpl implements MqttCallback {
@Autowired // @Autowired
private KafkaTemplate<String,Object> kafkaTemplate; // private KafkaTemplate<String,Object> kafkaTemplate;
@Override @Override
public void connectionLost(Throwable cause) { public void connectionLost(Throwable cause) {
@ -34,7 +34,7 @@ public class MqttCallBackServiceImpl implements MqttCallback {
public void messageArrived(String topic, MqttMessage message) { public void messageArrived(String topic, MqttMessage message) {
log.info("服务器{}监听的报文: {}" ,topic, ConversionUtil.hexStringToString(new String(message.getPayload()))); log.info("服务器{}监听的报文: {}" ,topic, ConversionUtil.hexStringToString(new String(message.getPayload())));
log.info("转化对象:{}", JSON.toJSONString(getVehicleData(ConversionUtil.hexStringToString(new String(message.getPayload()))))); log.info("转化对象:{}", JSON.toJSONString(getVehicleData(ConversionUtil.hexStringToString(new String(message.getPayload())))));
kafkaTemplate.send("testKafka",getVehicleData(ConversionUtil.hexStringToString(new String(message.getPayload()))).toString()); // kafkaTemplate.send("testKafka",getVehicleData(ConversionUtil.hexStringToString(new String(message.getPayload()))).toString());
} }
@Override @Override
@ -45,7 +45,7 @@ public class MqttCallBackServiceImpl implements MqttCallback {
public VehicleData getVehicleData(String message) { public VehicleData getVehicleData(String message) {
message = message.substring(1,message.length()-2); message = message.substring(1,message.length()-2);
return VehicleData.builder() return VehicleData.builder()
//17 //vin
.vin(message.substring(0,17)) .vin(message.substring(0,17))
// 当前时间戳 13 // 当前时间戳 13
.drivingRoute(message.substring(17,30)) .drivingRoute(message.substring(17,30))