From 25bc628f8f6e94ab9617e704cfedcad4169f04c6 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?=E5=BC=A0=E8=85=BE?= <3467447354@qq.com>
Date: Sun, 29 Sep 2024 01:17:24 +0800
Subject: [PATCH] =?UTF-8?q?=E6=97=B6=E5=BA=8F=E6=80=A7=E6=95=B0=E6=8D=AE?=
=?UTF-8?q?=E5=BA=93=E6=B5=8B=E8=AF=95=E6=88=90=E5=8A=9F?=
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
---
cloud-modules/cloud-modules-carData/pom.xml | 11 +-
.../config/kafkaconfig/KafkaConfig.java | 54 ++++-
.../lotdbconfig/IotDBSessionConfig.java | 220 +-----------------
.../carData/consumer/MyKafkaConsumer.java | 40 +++-
.../java/com/muyu/carData/pojo/Student.java | 6 +-
.../CacheController.java} | 4 +-
.../testcontroller/IotDBController.java | 111 +++++++++
.../KafkaProducerController.java | 50 ++++
8 files changed, 273 insertions(+), 223 deletions(-)
rename cloud-modules/cloud-modules-carData/src/main/java/com/muyu/carData/{controller/TestController.java => testcontroller/CacheController.java} (94%)
create mode 100644 cloud-modules/cloud-modules-carData/src/main/java/com/muyu/carData/testcontroller/IotDBController.java
create mode 100644 cloud-modules/cloud-modules-carData/src/main/java/com/muyu/carData/testcontroller/KafkaProducerController.java
diff --git a/cloud-modules/cloud-modules-carData/pom.xml b/cloud-modules/cloud-modules-carData/pom.xml
index 1dc7382..c1ab988 100644
--- a/cloud-modules/cloud-modules-carData/pom.xml
+++ b/cloud-modules/cloud-modules-carData/pom.xml
@@ -72,11 +72,6 @@
cloud-common-api-doc
-
- org.apache.kafka
- kafka-clients
- 3.0.0
-
com.muyu
cloud-common-core
@@ -87,7 +82,11 @@
caffeine
2.9.3
-
+
+ org.apache.kafka
+ kafka-clients
+ 3.0.0
+
org.apache.iotdb
iotdb-session
diff --git a/cloud-modules/cloud-modules-carData/src/main/java/com/muyu/carData/config/kafkaconfig/KafkaConfig.java b/cloud-modules/cloud-modules-carData/src/main/java/com/muyu/carData/config/kafkaconfig/KafkaConfig.java
index 4f4dac3..23b395b 100644
--- a/cloud-modules/cloud-modules-carData/src/main/java/com/muyu/carData/config/kafkaconfig/KafkaConfig.java
+++ b/cloud-modules/cloud-modules-carData/src/main/java/com/muyu/carData/config/kafkaconfig/KafkaConfig.java
@@ -1,6 +1,9 @@
package com.muyu.carData.config.kafkaconfig;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@@ -19,6 +22,55 @@ public class KafkaConfig {
@Bean
public KafkaProducer kafkaProducer(){
HashMap configs = new HashMap<>();
- return null;
+ configs.put("bootstrap.servers","60.204.221.52:9092");
+ configs.put("retries",2);
+ configs.put("batch.size",16384);
+ //生产者可用于缓冲等待发送到服务器的记录的总内存字节数
+ configs.put("buffer-memory",3554432);
+ /**
+ *生产者producer要求leader节点在考虑完成请求之前收到的确认数,用于控制发送记录在服务端的持久化
+ *acks=0,设置为0,则生产者producer将不会等待来自服务器的任何确认.该记录将立即添加到套接字(socket)缓冲区并视为已发送
+ * .在这种情况下,无法保证服务器已收到记录,并且重试配置(retries)将不会生效(因为客户端通常不会知道任何故障),每条记录返回的偏移量始终设置为-1.
+ *acks=1,设置为1,leader节点会把记录写入本地日志,不需要等待所有follower节点完全确认就会立即应答producer.在这种情况下,
+ * 在follower节点复制前,leader节点确认记录后立即失败的话,记录将会丢失.
+ */
+ configs.put("acks","-1");
+ //指定key使用的序列化类
+ StringSerializer keySerializer = new StringSerializer();
+ //指定value使用的序列化类
+ StringSerializer valueSerializer = new StringSerializer();
+ //创建kafka生产者
+ return new KafkaProducer(configs, keySerializer, valueSerializer);
+ }
+
+ @Bean
+ public KafkaConsumer kafkaConsumer(){
+ HashMap configs = new HashMap<>();
+ configs.put("bootstrap.servers","60.204.221.52:9092");
+ //开启consumer的偏移量(offset)自动提交到kafka
+ configs.put("enable.auto.commit",true);
+ //偏移量自动提交的时间间隔,单位毫秒
+ configs.put("auto.commit.interval",5000);
+ //在Kafka中没有初始化偏移量或者当前偏移量不存在情况
+ //earliest, 在偏移量无效的情况下, 自动重置为最早的偏移量
+ //latest, 在偏移量无效的情况下, 自动重置为最新的偏移量
+ //none, 在偏移量无效的情况下, 抛出异常.
+ configs.put("auto.offset.reset","latest");
+ //请求阻塞的最大时间(毫秒)
+ configs.put("fetch.max.wait",500);
+ //请求应答的最小字节数
+ configs.put("fetch.min.size",1);
+ //心跳间隔时间(毫秒)
+ configs.put("heartbeat-interval",3000);
+ //一次调用poll返回的最大记录条数
+ configs.put("max.poll.records",500);
+ //指定消费组
+ configs.put("group.id","firstGroup");
+ //指定key使用的反序列化类
+ StringDeserializer keyDeserializer = new StringDeserializer();
+ //指定value使用的反序列化类
+ StringDeserializer valueDeserializer = new StringDeserializer();
+ //创建kafka消费者
+ return new KafkaConsumer(configs,keyDeserializer,valueDeserializer);
}
}
diff --git a/cloud-modules/cloud-modules-carData/src/main/java/com/muyu/carData/config/lotdbconfig/IotDBSessionConfig.java b/cloud-modules/cloud-modules-carData/src/main/java/com/muyu/carData/config/lotdbconfig/IotDBSessionConfig.java
index d73354f..9561bb3 100644
--- a/cloud-modules/cloud-modules-carData/src/main/java/com/muyu/carData/config/lotdbconfig/IotDBSessionConfig.java
+++ b/cloud-modules/cloud-modules-carData/src/main/java/com/muyu/carData/config/lotdbconfig/IotDBSessionConfig.java
@@ -1,20 +1,15 @@
package com.muyu.carData.config.lotdbconfig;
-import com.muyu.carData.domain.IoTDBRecord;
-import com.muyu.carData.interfaces.IoTDBRecordable;
import org.apache.iotdb.rpc.IoTDBConnectionException;
-import org.apache.iotdb.rpc.StatementExecutionException;
import org.apache.iotdb.session.Session;
-import org.apache.iotdb.session.SessionDataSet;
-import org.apache.iotdb.session.util.Version;
-import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.session.pool.SessionPool;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
+import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.stereotype.Component;
-import java.util.ArrayList;
import java.util.List;
/**
@@ -61,216 +56,25 @@ public class IotDBSessionConfig {
@Value("${spring.iotdb.maxSize}")
private Integer maxSize;
- private static Session session;
/**
- * 获取session iotdb数据库初始化
- * @return
+ * 默认每次查询的条数
*/
- public Session getSession(){
+ @Value("${spring.iotdb.fetchSize}")
+ private int fetchSize;
- if (null == session){
- try {
- logger.info("正在连接iotdb......");
- session = new Session.Builder()
- .host(ip)
- .port(port)
- .username(username)
- .password(password)
- .version(Version.V_0_13)
- .build();
- IotDBSessionConfig.session.open(false);
- session.setFetchSize(maxSize);
- //设置时区
- session.setTimeZone("+08:00");
- } catch (IoTDBConnectionException | StatementExecutionException e) {
- logger.error(String.valueOf(e.getCause()));
- }
+ @Bean
+ public Session iotSession(){
+ Session session = new Session(ip, port, username, password);
+ try {
+ session.open();
+ } catch (IoTDBConnectionException e) {
+ logger.error(String.valueOf(e.getCause()));
}
return session;
}
- /**
- * 节点路径
- * @param records
- * @return
- * @throws Exception
- */
- private List getDeviceIds(List extends IoTDBRecordable> records) throws Exception {
- List deviceIds = new ArrayList<>();
- for (IoTDBRecordable record : records) {
- IoTDBRecord ioTDBRecord = record.toRecord();
- String deviceId = ioTDBRecord.getDeviceId();
- deviceIds.add(deviceId);
- }
- return deviceIds;
- }
-
- /**
- * 时间戳
- * @param records
- * @return
- * @throws Exception
- */
- private List getTimes(List extends IoTDBRecordable> records) throws Exception {
- List times = new ArrayList<>();
- for (IoTDBRecordable record : records) {
- long time = record.toRecord().getTime();
- times.add(time);
- }
- return times;
- }
-
- /**
- * 物理量 即属性
- * @param records
- * @return
- * @throws Exception
- */
- private List> getMeasurementsList(List extends IoTDBRecordable> records) throws Exception {
- List> measurementList = new ArrayList<>();
- for (IoTDBRecordable record : records) {
- List iotDBRecord = record.toRecord().getMeasurementList();
- measurementList.add(iotDBRecord);
- }
- return measurementList;
- }
-
- /**
- * 属性 必须和属性值一一对应
- * @param records
- * @return
- * @throws Exception
- */
- public List> getValueList(List extends IoTDBRecordable> records) throws Exception {
- List> valueList = new ArrayList<>();
- for (IoTDBRecordable record : records) {
- List