feat 存储kafka

new-master
rouchen 2024-06-09 21:09:53 +08:00
parent 5dd19eedcd
commit dcb15cd9e9
6 changed files with 62 additions and 6 deletions

View File

@ -1,5 +1,10 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="EntryPointsManager">
<list size="1">
<item index="0" class="java.lang.String" itemvalue="org.springframework.beans.factory.annotation.Autowired" />
</list>
</component>
<component name="ExternalStorageConfigurationManager" enabled="true" />
<component name="MavenProjectsManager">
<option name="originalFiles">

11
pom.xml
View File

@ -23,6 +23,17 @@
</parent>
<dependencies>
<!--kafka-->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.8.11</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.8.0</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>

View File

@ -0,0 +1,16 @@
package com.muyu.kafka;
/**
* PartitionerProducer
*
* @author Yangle
* Date 2024/6/9 18:41
*/
public class PartitionerProducer {
}

View File

@ -1,9 +1,15 @@
package com.muyu.mqtt;
import com.alibaba.fastjson.JSON;
import com.muyu.utils.ConversionUtil;
import com.muyu.vehicle.MessageData;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
/**
@ -14,6 +20,9 @@ import org.springframework.stereotype.Service;
*/
@Service
public class MessageCallbackService implements MqttCallback {
@Autowired
private KafkaTemplate kafkaTemplate;
@Override
public void connectionLost(Throwable cause) {
System.out.println("connectionLost:"+cause.getMessage());
@ -25,7 +34,12 @@ public class MessageCallbackService implements MqttCallback {
System.out.println("Qos:"+mqttMessage.getQos());
System.out.println("message content:"+new String(mqttMessage.getPayload()));
String s = new String(mqttMessage.getPayload());
ConversionUtil.main(s);
MessageData main = ConversionUtil.main(s);
String vin = main.getVin();
ProducerRecord<String, String> stringObjectProducerRecord = new ProducerRecord<>(vin,main.toString());
kafkaTemplate.send(stringObjectProducerRecord);
}
@Override

View File

@ -1,14 +1,15 @@
package com.muyu.utils;
import com.muyu.vehicle.MessageData;
import com.muyu.vehicle.VehicleData;
import lombok.extern.log4j.Log4j2;
import org.apache.kafka.clients.KafkaClient;
import org.springframework.beans.factory.annotation.Autowired;
import java.nio.charset.StandardCharsets;
@Log4j2
public class ConversionUtil {
/**
* 16
* @param s
@ -25,7 +26,7 @@ public class ConversionUtil {
return sb.toString();
}
public static void main (String args) {
public static MessageData main (String args) {
// String str = "<?xml version=\"1.0\"?>\n" +
// "<monitorRoot type=\"param\"><synchronizeSyptom event=\"0\" initial=\"true\"><Action_ECG><Rhythm>Sinus</Rhythm><HR>80</HR><EMD>No Change</EMD><Conduct>0</Conduct></Action_ECG><Action_Osat value=\"94\" isRelativePercent=\"false\"/><Action_BP isRelativePercent=\"false\"><Shrink value=\"120\"/><Stretch value=\"80\"/></Action_BP><Action_Resp breathType=\"Normal\" value=\"14\" isRelativePercent=\"false\"/><Action_etCO2 value=\"34\" isRelativePercent=\"false\"/><Action_Temperature value=\"35.2\"/><Action_CVP value=\"6.0\"/><Action_PAPDia value=\"10\"/><Action_PAPSys value=\"25\"/><Action_WP value=\"9\"/></synchronizeSyptom></monitorRoot>";
// String strToSixteen = strToSixteen(str);
@ -322,7 +323,9 @@ public class ConversionUtil {
.chg(chg)
.build();
log.error("报文解析:{}",build);
System.out.println(hexStringToString.length());
System.out.println(hexStringToString.length());
return build;
}
/**

View File

@ -93,7 +93,14 @@ forest:
adminHost: ${mqtt.admin.host}
adminTopicUri: ${mqtt.admin.topic-uri}
log-enabled: false
kafka:
bootstrap-servers: 127.0.0.1:9092
producer:
acks: all
retries: 0
batch-size: 16384
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
# 服务器配置
mqtt:
server: