From c98804b1f4775825e15794f9347316a3c159d4b1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=B0=91=E5=B9=B4=E6=A2=A6=E4=B8=8E=E7=A0=96?= <2847127106@qq.com> Date: Mon, 7 Oct 2024 11:34:36 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E5=AE=8C=E6=88=90kafka=E6=B6=88?= =?UTF-8?q?=E8=B4=B9=E5=88=B0=E6=95=B0=E6=8D=AE=E5=A4=84=E7=90=86=E7=9A=84?= =?UTF-8?q?=E6=95=B0=E6=8D=AE=E6=8C=81=E4=B9=85=E5=8C=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../processing/controller/TestController.java | 8 +-- .../impl/DataProcessingServiceImpl.java | 70 ++++++++++--------- .../branch/DataStorageProcessStrategy.java | 4 +- .../strategy/core/BasicStrategy.java | 2 +- .../strategy/leaves/DataStorageStrategy.java | 61 +++++++++++++++- 5 files changed, 102 insertions(+), 43 deletions(-) diff --git a/cloud-data-processing/src/main/java/com/muyu/data/processing/controller/TestController.java b/cloud-data-processing/src/main/java/com/muyu/data/processing/controller/TestController.java index 762975f..5704ff7 100644 --- a/cloud-data-processing/src/main/java/com/muyu/data/processing/controller/TestController.java +++ b/cloud-data-processing/src/main/java/com/muyu/data/processing/controller/TestController.java @@ -55,21 +55,21 @@ public class TestController { "key": "vin", "label": "VIN码", "type": "String", - "value": "vin131413534474" + "value": "vin999999" },{ "key": "timestamp", "label": "时间戳", - "type": "String", + "type": "long", "value": "1727534036893" },{ "key": "latitude", "label": "纬度", - "type": "String", + "type": "int", "value": "66.898" },{ "key": "longitude", "label": "经度", - "type": "String", + "type": "int", "value": "99.12" }]"""; ProducerRecord producerRecord = new ProducerRecord<>(KafkaConstants.KafkaTopic, jsonString); diff --git a/cloud-data-processing/src/main/java/com/muyu/data/processing/service/impl/DataProcessingServiceImpl.java b/cloud-data-processing/src/main/java/com/muyu/data/processing/service/impl/DataProcessingServiceImpl.java index 001ba6e..8d824bb 100644 --- a/cloud-data-processing/src/main/java/com/muyu/data/processing/service/impl/DataProcessingServiceImpl.java +++ b/cloud-data-processing/src/main/java/com/muyu/data/processing/service/impl/DataProcessingServiceImpl.java @@ -90,39 +90,43 @@ public class DataProcessingServiceImpl implements DataProcessingService { @Override public Object addCarData(HashMap hashMap) { - StringBuilder sql = new StringBuilder("insert into root.one."); - sql.append(hashMap.get("firmCode").getValue()) - .append(".") - .append(hashMap.get("vin").getValue()) - .append("("); - hashMap.remove("firmCode"); - hashMap.remove("vin"); - StringBuilder keys = new StringBuilder(); - StringBuilder values = new StringBuilder(); - hashMap.keySet().forEach(key -> { - keys.append(key).append(","); - if ("String".equals(hashMap.get(key).getType())) { - values.append("'") - .append(hashMap.get(key).getValue()) - .append("'") - .append(","); - }else { - values.append(hashMap.get(key).getValue()) - .append(","); - } - }); - sql.append(keys.substring(0, keys.length() - 1)) - .append(") values (") - .append(values.substring(0, values.length() - 1)) - .append(")"); - try { - sessionPool.executeNonQueryStatement(sql.toString()); - } catch (StatementExecutionException e) { - throw new RuntimeException(e); - } catch (IoTDBConnectionException e) { - throw new RuntimeException(e); - } - return sql; +// StringBuilder sql = new StringBuilder("insert into root.one."); +// sql.append(hashMap.get("firmCode").getValue()) +// .append(".") +// .append(hashMap.get("vin").getValue()) +// .append("("); +// hashMap.remove("firmCode"); +// hashMap.remove("vin"); +// StringBuilder keys = new StringBuilder(); +// StringBuilder values = new StringBuilder(); +// hashMap.keySet().forEach(key -> { +// if (hashMap.get(key) != null) { +// keys.append(key).append(","); +// if ("String".equals(hashMap.get(key).getType())) { +// values.append("'") +// .append(hashMap.get(key).getValue()) +// .append("'") +// .append(","); +// }else { +// values.append(hashMap.get(key).getValue()) +// .append(","); +// } +// } +// }); +// sql.append(keys.substring(0, keys.length() - 1)) +// .append(") values (") +// .append(values.substring(0, values.length() - 1)) +// .append(")"); +// try { +// sessionPool.executeNonQueryStatement(sql.toString()); +// } catch (StatementExecutionException e) { +// throw new RuntimeException(e); +// } catch (IoTDBConnectionException e) { +// throw new RuntimeException(e); +// } +// log.info("成功执行sql语句: [{}]", sql); +// return sql; + return null; } private static CarData getCarData(String vin, RowRecord next, List columnNames) { diff --git a/cloud-data-processing/src/main/java/com/muyu/data/processing/strategy/branch/DataStorageProcessStrategy.java b/cloud-data-processing/src/main/java/com/muyu/data/processing/strategy/branch/DataStorageProcessStrategy.java index 3072a47..47112da 100644 --- a/cloud-data-processing/src/main/java/com/muyu/data/processing/strategy/branch/DataStorageProcessStrategy.java +++ b/cloud-data-processing/src/main/java/com/muyu/data/processing/strategy/branch/DataStorageProcessStrategy.java @@ -29,7 +29,7 @@ public class DataStorageProcessStrategy extends abstractStrategyRouter NECESSARY_PARAM = new HashMap<>(); static { - NECESSARY_PARAM.put("vin","VIN码"); + NECESSARY_PARAM.put("VIN","VIN码"); NECESSARY_PARAM.put("timestamp","时间戳"); NECESSARY_PARAM.put("longitude","经度"); NECESSARY_PARAM.put("latitude","纬度"); @@ -39,7 +39,7 @@ public class DataStorageProcessStrategy extends abstractStrategyRouter, Temporary2> registerStrategy() { return param-> { // 判断是否存在问题 - if (!param.containsKey("DataStorageProcessStrategy")) { + if (param.containsKey("DataStorageProcessStrategy")) { log.error("持久化流程错误,缺少必要参数: {}", param.get("DataStorageProcessStrategy").getKey()); param.remove("DataStorageProcessStrategy"); return new EndStrategy(); diff --git a/cloud-data-processing/src/main/java/com/muyu/data/processing/strategy/core/BasicStrategy.java b/cloud-data-processing/src/main/java/com/muyu/data/processing/strategy/core/BasicStrategy.java index 7ca35a7..d67112b 100644 --- a/cloud-data-processing/src/main/java/com/muyu/data/processing/strategy/core/BasicStrategy.java +++ b/cloud-data-processing/src/main/java/com/muyu/data/processing/strategy/core/BasicStrategy.java @@ -32,7 +32,7 @@ public class BasicStrategy extends abstractStrategyRouter basicDataMap) { log.info("开始执行基础校验节点。。。"); - basicDataMap.put(CacheNameEnums.WARMING.getCode(), null); + basicDataMap.put(CacheNameEnums.STORAGE.getCode(), null); basicDataMap.put(CacheNameEnums.FAULT.getCode(), null); basicDataMap.put(CacheNameEnums.REALTIME.getCode(), null); log.info("基础校验节点已通过。。。"); diff --git a/cloud-data-processing/src/main/java/com/muyu/data/processing/strategy/leaves/DataStorageStrategy.java b/cloud-data-processing/src/main/java/com/muyu/data/processing/strategy/leaves/DataStorageStrategy.java index f0287f4..62aa625 100644 --- a/cloud-data-processing/src/main/java/com/muyu/data/processing/strategy/leaves/DataStorageStrategy.java +++ b/cloud-data-processing/src/main/java/com/muyu/data/processing/strategy/leaves/DataStorageStrategy.java @@ -1,13 +1,23 @@ package com.muyu.data.processing.strategy.leaves; -import com.muyu.common.caffeine.enums.CacheNameEnums;import com.muyu.data.processing.domain.BasicData; +import com.muyu.common.caffeine.enums.CacheNameEnums; +import com.muyu.common.iotdb.config.IotDBSessionConfig; +import com.muyu.data.processing.domain.BasicData; import java.util.HashMap; import java.util.List; import com.muyu.data.processing.domain.Temporary2; +import com.muyu.data.processing.service.DataProcessingService; +import com.muyu.data.processing.service.impl.DataProcessingServiceImpl; import com.muyu.data.processing.strategy.StrategyHandler; import com.muyu.data.processing.strategy.abstractStrategyRouter; import com.muyu.data.processing.strategy.core.RoutingStrategy; +import jakarta.annotation.Resource; +import lombok.Setter; import lombok.extern.slf4j.Slf4j; +import org.apache.iotdb.rpc.IoTDBConnectionException; +import org.apache.iotdb.rpc.StatementExecutionException; +import org.apache.iotdb.session.pool.SessionPool; +import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; /** @@ -24,15 +34,60 @@ import org.springframework.stereotype.Component; @Component public class DataStorageStrategy extends abstractStrategyRouter, Temporary2> implements StrategyHandler, Temporary2> { + + private SessionPool sessionPool = new IotDBSessionConfig().getSessionPool(); + @Override protected StrategyMapper, Temporary2> registerStrategy() { - return param -> new RoutingStrategy(); + return param -> { + log.info("数据持久化节点已通过。。。"); + return new RoutingStrategy(); + }; } @Override public Temporary2 apply(HashMap basicDataMap) { - log.info("数据持久化节点已通过。。。"); basicDataMap.remove(CacheNameEnums.STORAGE.getCode()); + // 执行持久化方法 + addCarData(basicDataMap); return applyStrategy(basicDataMap); } + + private void addCarData(HashMap hashMap) { + StringBuilder sql = new StringBuilder("insert into root.one."); + sql.append(hashMap.get("firmCode").getValue()) + .append(".") + .append(hashMap.get("VIN").getValue()) + .append("("); + hashMap.remove("firmCode"); + hashMap.remove("VIN"); + StringBuilder keys = new StringBuilder(); + StringBuilder values = new StringBuilder(); + hashMap.keySet().forEach(key -> { + if (hashMap.get(key) != null) { + keys.append(key).append(","); + if ("String".equals(hashMap.get(key).getType())) { + values.append("'") + .append(hashMap.get(key).getValue()) + .append("'") + .append(","); + }else { + values.append(hashMap.get(key).getValue()) + .append(","); + } + } + }); + sql.append(keys.substring(0, keys.length() - 1)) + .append(") values (") + .append(values.substring(0, values.length() - 1)) + .append(")"); + try { + sessionPool.executeNonQueryStatement(sql.toString()); + } catch (StatementExecutionException e) { + throw new RuntimeException(e); + } catch (IoTDBConnectionException e) { + throw new RuntimeException(e); + } + log.info("成功执行sql语句: [{}]", sql); + } }