From 5bb57ecbb260398abedb6999b3db3b631430476e Mon Sep 17 00:00:00 2001 From: chentaisen <14615430+chentaisen@user.noreply.gitee.com> Date: Mon, 7 Oct 2024 20:51:58 +0800 Subject: [PATCH] =?UTF-8?q?feat=EF=BC=9A=E5=90=88=E5=B9=B6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- cloud-common/cloud-common-caffeine/pom.xml | 35 ++++ .../common/caffeine/bean/CaffeineManager.java | 51 ++++++ .../caffeine/constents/CaffeineContent.java | 17 ++ .../common/caffeine/enums/CacheNameEnums.java | 68 ++++++++ .../caffeine/utils/CaffeineCacheUtils.java | 97 ++++++++++ ...ot.autoconfigure.AutoConfiguration.imports | 2 + cloud-common/cloud-common-iotdb/pom.xml | 36 ++++ cloud-common/cloud-common-kafka/pom.xml | 37 ++++ cloud-common/pom.xml | 4 + .../cloud-modules-data-processing/pom.xml | 155 ++++++++++++++++ .../data/processing/MyDataApplication.java | 32 ++++ .../controller/DataProcessingController.java | 100 +++++++++++ .../processing/controller/TestController.java | 165 ++++++++++++++++++ .../data/processing/domain/BasicData.java | 36 ++++ .../muyu/data/processing/domain/CarData.java | 25 +++ .../data/processing/domain/IotDbData.java | 35 ++++ .../data/processing/domain/Temporary2.java | 22 +++ .../kafka/KafkaConsumerService.java | 71 ++++++++ .../mapper/DataProcessingMapper.java | 35 ++++ .../rebbit/DownlineRabbitConsumer.java | 74 ++++++++ .../rebbit/GoOnlineRabbitConsumer.java | 74 ++++++++ .../service/DataProcessingService.java | 36 ++++ .../impl/DataProcessingServiceImpl.java | 148 ++++++++++++++++ .../processing/strategy/StrategyHandler.java | 24 +++ .../strategy/abstractStrategyRouter.java | 60 +++++++ .../branch/DataStorageProcessStrategy.java | 63 +++++++ .../branch/FaultJudgmentStrategy.java | 38 ++++ .../branch/FaultProcessingStrategy.java | 37 ++++ .../branch/FenceJudgmentStrategy.java | 38 ++++ .../branch/FenceProcessingStrategy.java | 36 ++++ .../branch/RealTimeJudgmentStrategy.java | 38 ++++ .../branch/RealTimeProcessingStrategy.java | 36 ++++ .../branch/WarningJudgmentStrategy.java | 38 ++++ .../branch/WarningProcessingStrategy.java | 36 ++++ .../strategy/core/BasicStrategy.java | 41 +++++ .../processing/strategy/core/EndStrategy.java | 29 +++ .../strategy/core/RoutingStrategy.java | 59 +++++++ .../strategy/core/StartStrategy.java | 47 +++++ .../strategy/leaves/DataStorageStrategy.java | 90 ++++++++++ .../strategy/leaves/FaultAlarmStrategy.java | 38 ++++ .../strategy/leaves/FenceAlarmStrategy.java | 38 ++++ .../leaves/RealTimeAlarmStrategy.java | 38 ++++ .../strategy/leaves/WarningAlarmStrategy.java | 38 ++++ .../muyu/data/processing/utils/DataUtils.java | 28 +++ .../src/main/resources/banner.txt | 2 + .../src/main/resources/bootstrap.yml | 72 ++++++++ .../src/main/resources/logback/dev.xml | 74 ++++++++ .../src/main/resources/logback/prod.xml | 81 +++++++++ .../src/main/resources/logback/test.xml | 81 +++++++++ .../processing/DataProcessingMapper.xml | 38 ++++ cloud-modules/pom.xml | 1 + pom.xml | 2 +- 52 files changed, 2595 insertions(+), 1 deletion(-) create mode 100644 cloud-common/cloud-common-caffeine/pom.xml create mode 100644 cloud-common/cloud-common-iotdb/pom.xml create mode 100644 cloud-common/cloud-common-kafka/pom.xml create mode 100644 cloud-modules/cloud-modules-data-processing/pom.xml create mode 100644 cloud-modules/cloud-modules-data-processing/src/main/java/com/muyu/data/processing/MyDataApplication.java create mode 100644 cloud-modules/cloud-modules-data-processing/src/main/java/com/muyu/data/processing/controller/DataProcessingController.java create mode 100644 cloud-modules/cloud-modules-data-processing/src/main/java/com/muyu/data/processing/controller/TestController.java create mode 100644 cloud-modules/cloud-modules-data-processing/src/main/java/com/muyu/data/processing/domain/BasicData.java create mode 100644 cloud-modules/cloud-modules-data-processing/src/main/java/com/muyu/data/processing/domain/CarData.java create mode 100644 cloud-modules/cloud-modules-data-processing/src/main/java/com/muyu/data/processing/domain/IotDbData.java create mode 100644 cloud-modules/cloud-modules-data-processing/src/main/java/com/muyu/data/processing/domain/Temporary2.java create mode 100644 cloud-modules/cloud-modules-data-processing/src/main/java/com/muyu/data/processing/kafka/KafkaConsumerService.java create mode 100644 cloud-modules/cloud-modules-data-processing/src/main/java/com/muyu/data/processing/mapper/DataProcessingMapper.java create mode 100644 cloud-modules/cloud-modules-data-processing/src/main/java/com/muyu/data/processing/rebbit/DownlineRabbitConsumer.java create mode 100644 cloud-modules/cloud-modules-data-processing/src/main/java/com/muyu/data/processing/rebbit/GoOnlineRabbitConsumer.java create mode 100644 cloud-modules/cloud-modules-data-processing/src/main/java/com/muyu/data/processing/service/DataProcessingService.java create mode 100644 cloud-modules/cloud-modules-data-processing/src/main/java/com/muyu/data/processing/service/impl/DataProcessingServiceImpl.java create mode 100644 cloud-modules/cloud-modules-data-processing/src/main/java/com/muyu/data/processing/strategy/StrategyHandler.java create mode 100644 cloud-modules/cloud-modules-data-processing/src/main/java/com/muyu/data/processing/strategy/abstractStrategyRouter.java create mode 100644 cloud-modules/cloud-modules-data-processing/src/main/java/com/muyu/data/processing/strategy/branch/DataStorageProcessStrategy.java create mode 100644 cloud-modules/cloud-modules-data-processing/src/main/java/com/muyu/data/processing/strategy/branch/FaultJudgmentStrategy.java create mode 100644 cloud-modules/cloud-modules-data-processing/src/main/java/com/muyu/data/processing/strategy/branch/FaultProcessingStrategy.java create mode 100644 cloud-modules/cloud-modules-data-processing/src/main/java/com/muyu/data/processing/strategy/branch/FenceJudgmentStrategy.java create mode 100644 cloud-modules/cloud-modules-data-processing/src/main/java/com/muyu/data/processing/strategy/branch/FenceProcessingStrategy.java create mode 100644 cloud-modules/cloud-modules-data-processing/src/main/java/com/muyu/data/processing/strategy/branch/RealTimeJudgmentStrategy.java create mode 100644 cloud-modules/cloud-modules-data-processing/src/main/java/com/muyu/data/processing/strategy/branch/RealTimeProcessingStrategy.java create mode 100644 cloud-modules/cloud-modules-data-processing/src/main/java/com/muyu/data/processing/strategy/branch/WarningJudgmentStrategy.java create mode 100644 cloud-modules/cloud-modules-data-processing/src/main/java/com/muyu/data/processing/strategy/branch/WarningProcessingStrategy.java create mode 100644 cloud-modules/cloud-modules-data-processing/src/main/java/com/muyu/data/processing/strategy/core/BasicStrategy.java create mode 100644 cloud-modules/cloud-modules-data-processing/src/main/java/com/muyu/data/processing/strategy/core/EndStrategy.java create mode 100644 cloud-modules/cloud-modules-data-processing/src/main/java/com/muyu/data/processing/strategy/core/RoutingStrategy.java create mode 100644 cloud-modules/cloud-modules-data-processing/src/main/java/com/muyu/data/processing/strategy/core/StartStrategy.java create mode 100644 cloud-modules/cloud-modules-data-processing/src/main/java/com/muyu/data/processing/strategy/leaves/DataStorageStrategy.java create mode 100644 cloud-modules/cloud-modules-data-processing/src/main/java/com/muyu/data/processing/strategy/leaves/FaultAlarmStrategy.java create mode 100644 cloud-modules/cloud-modules-data-processing/src/main/java/com/muyu/data/processing/strategy/leaves/FenceAlarmStrategy.java create mode 100644 cloud-modules/cloud-modules-data-processing/src/main/java/com/muyu/data/processing/strategy/leaves/RealTimeAlarmStrategy.java create mode 100644 cloud-modules/cloud-modules-data-processing/src/main/java/com/muyu/data/processing/strategy/leaves/WarningAlarmStrategy.java create mode 100644 cloud-modules/cloud-modules-data-processing/src/main/java/com/muyu/data/processing/utils/DataUtils.java create mode 100644 cloud-modules/cloud-modules-data-processing/src/main/resources/banner.txt create mode 100644 cloud-modules/cloud-modules-data-processing/src/main/resources/bootstrap.yml create mode 100644 cloud-modules/cloud-modules-data-processing/src/main/resources/logback/dev.xml create mode 100644 cloud-modules/cloud-modules-data-processing/src/main/resources/logback/prod.xml create mode 100644 cloud-modules/cloud-modules-data-processing/src/main/resources/logback/test.xml create mode 100644 cloud-modules/cloud-modules-data-processing/src/main/resources/mapper/processing/DataProcessingMapper.xml diff --git a/cloud-common/cloud-common-caffeine/pom.xml b/cloud-common/cloud-common-caffeine/pom.xml new file mode 100644 index 0000000..286b121 --- /dev/null +++ b/cloud-common/cloud-common-caffeine/pom.xml @@ -0,0 +1,35 @@ + + + 4.0.0 + + com.muyu + cloud-common + 3.6.3 + + + cloud-common-caffeine + + cloud-common-caffeine caffeine缓存模块 + + + 17 + 17 + UTF-8 + + + + + com.muyu + cloud-common-redis + + + + com.github.ben-manes.caffeine + caffeine + + + + + diff --git a/cloud-common/cloud-common-caffeine/src/main/java/com/muyu/common/caffeine/bean/CaffeineManager.java b/cloud-common/cloud-common-caffeine/src/main/java/com/muyu/common/caffeine/bean/CaffeineManager.java index e69de29..8ccc11f 100644 --- a/cloud-common/cloud-common-caffeine/src/main/java/com/muyu/common/caffeine/bean/CaffeineManager.java +++ b/cloud-common/cloud-common-caffeine/src/main/java/com/muyu/common/caffeine/bean/CaffeineManager.java @@ -0,0 +1,51 @@ +package com.muyu.common.caffeine.bean; + + +import com.muyu.common.caffeine.enums.CacheNameEnums; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Value; +import com.github.benmanes.caffeine.cache.Caffeine; +import org.springframework.cache.CacheManager; +import org.springframework.cache.caffeine.CaffeineCache; +import org.springframework.cache.support.SimpleCacheManager; +import org.springframework.context.annotation.Bean; +import org.springframework.stereotype.Component; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + + +/** + * Caffeine管理器 + * @Author: 胡杨 + * @Name: CaffeineCacheConfig + * @Description: Caffeine管理器 + * @CreatedDate: 2024/9/26 上午11:52 + * @FilePath: com.muyu.common.caffeine.config + */ + +@Slf4j +@Component +public class CaffeineManager { + + /** + * 创建缓存管理器 + * @return 缓存管理器实例 + */ + @Bean + public SimpleCacheManager simpleCacheManager() { + SimpleCacheManager cacheManager = new SimpleCacheManager(); + List cacheNames = CacheNameEnums.getCodes(); + cacheManager.setCaches(cacheNames.stream() + .map(name -> new CaffeineCache( + name, + Caffeine.newBuilder() + .recordStats() + .build())) + .toList()); + log.info("缓存管理器初始化完成,缓存分区:{}", cacheNames); + return cacheManager; + } + +} diff --git a/cloud-common/cloud-common-caffeine/src/main/java/com/muyu/common/caffeine/constents/CaffeineContent.java b/cloud-common/cloud-common-caffeine/src/main/java/com/muyu/common/caffeine/constents/CaffeineContent.java index e69de29..251bc9b 100644 --- a/cloud-common/cloud-common-caffeine/src/main/java/com/muyu/common/caffeine/constents/CaffeineContent.java +++ b/cloud-common/cloud-common-caffeine/src/main/java/com/muyu/common/caffeine/constents/CaffeineContent.java @@ -0,0 +1,17 @@ +package com.muyu.common.caffeine.constents; + +/** + * Caffeine常量 + * @Author: 胡杨 + * @Name: CaffeineContent + * @Description: Caffeine常量 + * @CreatedDate: 2024/9/26 下午12:06 + * @FilePath: com.muyu.common.caffeine.constents + */ + +public class CaffeineContent { + + public static final String CAR_VIN_KEY = "car:vin"; + + public static final String VIN = "vin"; +} diff --git a/cloud-common/cloud-common-caffeine/src/main/java/com/muyu/common/caffeine/enums/CacheNameEnums.java b/cloud-common/cloud-common-caffeine/src/main/java/com/muyu/common/caffeine/enums/CacheNameEnums.java index e69de29..3d3116b 100644 --- a/cloud-common/cloud-common-caffeine/src/main/java/com/muyu/common/caffeine/enums/CacheNameEnums.java +++ b/cloud-common/cloud-common-caffeine/src/main/java/com/muyu/common/caffeine/enums/CacheNameEnums.java @@ -0,0 +1,68 @@ +package com.muyu.common.caffeine.enums; + +import lombok.Getter; + +import java.util.Arrays; +import java.util.List; + +/** + * 缓存分区枚举 + * + * @Author: 胡杨 + * @Name: CacheNameEnums + * @Description: 缓存分区枚举 + * @CreatedDate: 2024/10/2 上午9:17 + * @FilePath: com.muyu.common.caffeine.enums + */ + +@Getter +public enum CacheNameEnums { + STORAGE("storage", "持久化"), + FAULT("fault", "故障"), + FENCE("fence", "围栏"), + WARMING("warming", "预警"), + REALTIME("realTime", "实时信息"); + + private final String code; + private final String info; + + CacheNameEnums(String code, String info) { + this.code = code; + this.info = info; + } + + /** + * 鉴别参数是否是枚举的值 + * + * @param code 需鉴别参数 + * @return 如果存在返回结果turn, 否则返回false + */ + public static boolean isCode(String code) { + return Arrays.stream(values()) + .map(CacheNameEnums::getCode) + .anyMatch(c -> c.equals(code)); + } + + /** + * 获取枚举Value + * @param code 编码 + * @return Value + */ + public static String getInfo(String code) { + return Arrays.stream(values()) + .filter(c -> c.getCode().equals(code)) + .map(CacheNameEnums::getInfo) + .findFirst() + .orElse(""); + } + + /** + * 获取所有code + * @return code集合 + */ + public static List getCodes() { + return Arrays.stream(values()) + .map(CacheNameEnums::getCode) + .toList(); + } +} diff --git a/cloud-common/cloud-common-caffeine/src/main/java/com/muyu/common/caffeine/utils/CaffeineCacheUtils.java b/cloud-common/cloud-common-caffeine/src/main/java/com/muyu/common/caffeine/utils/CaffeineCacheUtils.java index e69de29..9064bf9 100644 --- a/cloud-common/cloud-common-caffeine/src/main/java/com/muyu/common/caffeine/utils/CaffeineCacheUtils.java +++ b/cloud-common/cloud-common-caffeine/src/main/java/com/muyu/common/caffeine/utils/CaffeineCacheUtils.java @@ -0,0 +1,97 @@ +package com.muyu.common.caffeine.utils; + + +import com.github.benmanes.caffeine.cache.Cache; +import com.github.benmanes.caffeine.cache.Caffeine; +import com.muyu.common.caffeine.constents.CaffeineContent; +import com.muyu.common.caffeine.enums.CacheNameEnums; +import com.muyu.common.redis.service.RedisService; +import jakarta.annotation.Resource; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.ObjectUtils; +import org.apache.commons.lang3.StringUtils; +import org.springframework.cache.CacheManager; +import org.springframework.cache.caffeine.CaffeineCache; +import org.springframework.data.redis.core.RedisTemplate; +import org.springframework.stereotype.Component; + +import java.util.ArrayList; +import java.util.Collection; + +/** + * Caffeine缓存工具 + * @Author: 胡杨 + * @Name: CaffeineUtils + * @Description: 缓存工具类 + * @CreatedDate: 2024/9/26 下午2:53 + * @FilePath: com.muyu.common.caffeine + */ +@Slf4j +@Component +public class CaffeineCacheUtils { + @Resource + private CacheManager cacheManager; + @Resource + private RedisTemplate redisTemplate; + + + /** + * 车辆上线 - 新增缓存 + */ + public void addCarCache(String vin) { + // 从Redis中获取缓存信息 + for (String name : CacheNameEnums.getCodes()) { + String value = redisTemplate.opsForValue().get(name+":"+vin); + cacheManager.getCache(name).put(vin, value); + log.info("存储缓存, 缓存分区:[{}], 车辆编码:[{}], 存储值:[{}]", name, vin, value); + } + log.info("车辆编码:{},本地缓存完成...",vin); + } + + /** + * 车辆下线 - 删除缓存 + */ + public void deleteCarCache(String cacheName) { + if (!hasCarVinCache(cacheName,null)) { + log.warn("车辆编码:{},本地缓存不存在该车辆信息...", cacheName); + return; + } + cacheManager.getCache(cacheName).invalidate(); + log.info("车辆编码:{},本地缓存删除完成...", cacheName); + } + + /** + * 获取车辆信息缓存 + */ + public Object getCarCache(String cacheName, String key) { + if (!hasCarVinCache(cacheName, key)){ + log.warn("车辆编码:{},本地缓存不存在该车辆信息...",cacheName); + return null; + } + return cacheManager.getCache(cacheName).get(key).get(); + } + + /** + * 获取车辆信息缓存 + */ + public T getCarCache(String cacheName, String key, Class type) { + if (!hasCarVinCache(cacheName,key)){ + log.warn("车辆编码:{},本地缓存不存在该车辆信息...",cacheName); + return null; + } + return cacheManager.getCache(cacheName).get(key, type); + } + + /** + * 判断缓存存在与否 + */ + public Boolean hasCarVinCache(String cacheName,String key) { + boolean notEmpty = ObjectUtils.isNotEmpty(cacheManager.getCache(cacheName)); + if (notEmpty && StringUtils.isNotEmpty(key)){ + return ObjectUtils.isNotEmpty(cacheManager.getCache(cacheName).get(key).get()); + } + return notEmpty; + + } + +} diff --git a/cloud-common/cloud-common-caffeine/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports b/cloud-common/cloud-common-caffeine/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports index e69de29..0b7acd4 100644 --- a/cloud-common/cloud-common-caffeine/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports +++ b/cloud-common/cloud-common-caffeine/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports @@ -0,0 +1,2 @@ +com.muyu.common.caffeine.utils.CaffeineCacheUtils +com.muyu.common.caffeine.bean.CaffeineManager diff --git a/cloud-common/cloud-common-iotdb/pom.xml b/cloud-common/cloud-common-iotdb/pom.xml new file mode 100644 index 0000000..0972390 --- /dev/null +++ b/cloud-common/cloud-common-iotdb/pom.xml @@ -0,0 +1,36 @@ + + + 4.0.0 + + com.muyu + cloud-common + 3.6.3 + + + cloud-common-iotdb + + cloud-common-iotdb 时序数据库模块 + + + 17 + 17 + UTF-8 + + + + + + com.muyu + cloud-common-core + + + + org.apache.iotdb + iotdb-session + 1.3.2 + + + + diff --git a/cloud-common/cloud-common-kafka/pom.xml b/cloud-common/cloud-common-kafka/pom.xml new file mode 100644 index 0000000..fca6155 --- /dev/null +++ b/cloud-common/cloud-common-kafka/pom.xml @@ -0,0 +1,37 @@ + + + 4.0.0 + + com.muyu + cloud-common + 3.6.3 + + + cloud-common-kafka + + cloud-common-kafka kafka中间件模块 + + + 17 + 17 + UTF-8 + + + + + + com.muyu + cloud-common-redis + + + + org.apache.kafka + kafka-clients + 3.0.0 + + + + + diff --git a/cloud-common/pom.xml b/cloud-common/pom.xml index a7a40be..b053ac7 100644 --- a/cloud-common/pom.xml +++ b/cloud-common/pom.xml @@ -21,6 +21,10 @@ cloud-common-xxl cloud-common-rabbit cloud-common-saas + cloud-common-cache + cloud-common-caffeine + cloud-common-iotdb + cloud-common-kafka cloud-common diff --git a/cloud-modules/cloud-modules-data-processing/pom.xml b/cloud-modules/cloud-modules-data-processing/pom.xml new file mode 100644 index 0000000..9e3122d --- /dev/null +++ b/cloud-modules/cloud-modules-data-processing/pom.xml @@ -0,0 +1,155 @@ + + + 4.0.0 + + com.muyu + cloud-server + 3.6.3 + + + cloud-modules-data-processing + + + cloud-data-processing 数据处理模块 + + + + 17 + 17 + UTF-8 + + + + + com.muyu + cloud-common-kafka + + + + com.muyu + cloud-common-caffeine + + + + com.muyu + cloud-common-rabbit + + + + com.muyu + cloud-common-iotdb + 3.6.3 + + + + + com.alibaba.cloud + spring-cloud-starter-alibaba-nacos-discovery + + + + + com.alibaba.cloud + spring-cloud-starter-alibaba-nacos-config + + + + + com.alibaba.cloud + spring-cloud-starter-alibaba-sentinel + + + + + org.springframework.boot + spring-boot-starter-actuator + + + + org.springframework.boot + spring-boot-starter-tomcat + + + + + com.mysql + mysql-connector-j + + + + + com.muyu + cloud-common-datascope + + + + com.muyu + cloud-common-datasource + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + ${project.artifactId} + + + org.springframework.boot + spring-boot-maven-plugin + + + + repackage + + + + + + + + diff --git a/cloud-modules/cloud-modules-data-processing/src/main/java/com/muyu/data/processing/MyDataApplication.java b/cloud-modules/cloud-modules-data-processing/src/main/java/com/muyu/data/processing/MyDataApplication.java new file mode 100644 index 0000000..7318acf --- /dev/null +++ b/cloud-modules/cloud-modules-data-processing/src/main/java/com/muyu/data/processing/MyDataApplication.java @@ -0,0 +1,32 @@ +package com.muyu.data.processing; + +import com.muyu.common.kafka.constants.KafkaConstants; + +import com.muyu.common.security.annotation.EnableCustomConfig; +import com.muyu.common.security.annotation.EnableMyFeignClients; +import jakarta.annotation.PostConstruct; +import org.springframework.amqp.rabbit.annotation.EnableRabbit; +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration; + +/** + * 数据处理模块启动器 + * @Author: 胡杨 + * @Name: MyData + * @Description: 数据处理模块启动器 + * @CreatedDate: 2024/9/26 下午7:31 + * @FilePath: com.muyu.data.processing + */ +@EnableRabbit +@EnableCustomConfig +@EnableMyFeignClients +@SpringBootApplication(scanBasePackages = {"com.muyu"}) +public class MyDataApplication { + public static void main(String[] args) { + SpringApplication.run(MyDataApplication.class, args); + + System.out.println("MyData 模块启动成功!"); + } + +} diff --git a/cloud-modules/cloud-modules-data-processing/src/main/java/com/muyu/data/processing/controller/DataProcessingController.java b/cloud-modules/cloud-modules-data-processing/src/main/java/com/muyu/data/processing/controller/DataProcessingController.java new file mode 100644 index 0000000..8ab8328 --- /dev/null +++ b/cloud-modules/cloud-modules-data-processing/src/main/java/com/muyu/data/processing/controller/DataProcessingController.java @@ -0,0 +1,100 @@ +package com.muyu.data.processing.controller; + +import com.muyu.common.core.domain.Result; +import com.muyu.common.security.utils.SecurityUtils; +import com.muyu.data.processing.domain.BasicData; +import com.muyu.data.processing.domain.IotDbData; +import com.muyu.data.processing.service.DataProcessingService; + +import javax.annotation.Resource; + +import org.springframework.web.bind.annotation.*; +import lombok.extern.slf4j.Slf4j; + +import java.util.Date; +import java.util.HashMap; +import java.util.List; + +/** + * 数据处理控制层 + * + * @Author: 胡杨 + * @Name: DataProcessing + * @Description: 数据处理控制层 + * @CreatedDate: 2024/9/28 下午3:53 + * @FilePath: com.muyu.data.processing.controller + */ + +@Slf4j +@RestController +@RequestMapping("/DataProcessing") +public class DataProcessingController { + @Resource + private DataProcessingService service; + + /** + * 查看数据库有多少组 + * + * @return + */ + @GetMapping("/selectStorageGroup") + public Result selectStorageGroup() { + List v = service.selectStorageGroup();if (v.size() > 0) {v.forEach(x -> { + System.out.println("group------------------" + x.toString()); + }); + return Result.success(v); + } else { + return Result.error(false); + } + } + + @GetMapping("/selectCarData") + public Result selectCarData(@RequestParam("vin") String vin) { +// String firmCode = SecurityUtils.getSaasKey(); + String firmCode = "firm01"; + return Result.success(service.selectCarData(firmCode,vin)); + } + + @PostMapping("/addCarData") + public Result addCarData(@RequestBody IotDbData data) { + HashMap hashMap = new HashMap<>(); + hashMap.put("timestamp", BasicData + .builder() + .key("timestamp") + .label("时间戳") + .value(String.valueOf(data.getTimestamp())) + .type("string") + .build()); + hashMap.put("vin", BasicData + .builder() + .key("vin") + .label("VIN码") + .value(data.getVin()) + .type("string") + .build()); + hashMap.put("latitude", BasicData + .builder() + .key("latitude") + .label("纬度") + .value(data.getLatitude()) + .type("long") + .build()); + hashMap.put("longitude", BasicData + .builder() + .key("longitude") + .label("经度") + .value(data.getLongitude()) + .type("long") + .build()); + hashMap.put("firmCode", BasicData + .builder() + .key("firmCode") + .label("企业编码") + .value("firm01") + .type("string") + .build()); + return Result.success(service.addCarData(hashMap)); + } + + +} diff --git a/cloud-modules/cloud-modules-data-processing/src/main/java/com/muyu/data/processing/controller/TestController.java b/cloud-modules/cloud-modules-data-processing/src/main/java/com/muyu/data/processing/controller/TestController.java new file mode 100644 index 0000000..5704ff7 --- /dev/null +++ b/cloud-modules/cloud-modules-data-processing/src/main/java/com/muyu/data/processing/controller/TestController.java @@ -0,0 +1,165 @@ +package com.muyu.data.processing.controller; + + +import com.github.benmanes.caffeine.cache.Caffeine; +import com.muyu.common.caffeine.enums.CacheNameEnums; +import com.muyu.common.core.utils.uuid.UUID; +import com.muyu.common.iotdb.config.IotDBSessionConfig; +import com.muyu.common.kafka.constants.KafkaConstants; +import com.muyu.common.rabbit.constants.RabbitConstants; +import jakarta.annotation.Resource; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.springframework.amqp.rabbit.core.RabbitTemplate; +import org.springframework.cache.Cache; +import org.springframework.cache.caffeine.CaffeineCache; +import org.springframework.cache.support.SimpleCacheManager; +import org.springframework.data.redis.core.RedisTemplate; +import org.springframework.web.bind.annotation.*; +import lombok.extern.slf4j.Slf4j; + +import java.util.ArrayList; + +/** + * 测试控制层 + * @Author: 胡杨 + * @Name: Test + * @Description: + * @CreatedDate: 2024/9/27 上午10:54 + * @FilePath: com.muyu.data.processing.controller + */ +@Slf4j +@RestController +@RequestMapping("/Test") +public class TestController { + @Resource + private KafkaProducer kafkaProducer; + @Resource + private RabbitTemplate rabbitTemplate; + @Resource + private IotDBSessionConfig iotDBSessionConfig; + @Resource + private RedisTemplate redisTemplate; +// @Resource +// private CaffeineCacheUtils cacheUtils; + + @Resource + private SimpleCacheManager cacheManager; + + @GetMapping("/testKafka") + public void sendMsg() { + try { + // 测试数据 + String jsonString = """ + [{ + "key": "vin", + "label": "VIN码", + "type": "String", + "value": "vin999999" + },{ + "key": "timestamp", + "label": "时间戳", + "type": "long", + "value": "1727534036893" + },{ + "key": "latitude", + "label": "纬度", + "type": "int", + "value": "66.898" + },{ + "key": "longitude", + "label": "经度", + "type": "int", + "value": "99.12" + }]"""; + ProducerRecord producerRecord = new ProducerRecord<>(KafkaConstants.KafkaTopic, jsonString); + kafkaProducer.send(producerRecord); + System.out.println("同步消息发送成功: " + jsonString); + } catch (Exception e) { + e.printStackTrace(); + System.out.println("同步消息发送失败"); + } + } + + @GetMapping("/testRabbit/GoOnline") + public void testRabbitGoOnline(@RequestParam("msg") String msg) { + rabbitTemplate.convertAndSend(RabbitConstants.GO_ONLINE_QUEUE, msg, message -> { + message.getMessageProperties().setMessageId(UUID.randomUUID().toString().replace("-","")); + return message; + }); + } + + @GetMapping("/testRabbit/Downline") + public void testRabbitDownline(@RequestParam("msg") String msg) { + rabbitTemplate.convertAndSend(RabbitConstants.DOWNLINE_QUEUE, msg, message -> { + message.getMessageProperties().setMessageId(UUID.randomUUID().toString().replace("-","")); + return message; + }); + } + + @GetMapping("/insertData") + public void insertData(@RequestParam("deviceId") String deviceId, @RequestParam("time") long time, @RequestParam("value") double value) throws Exception { + String sql = String.format("insert into root.one.%s(timestamp, temperature) values (%d, %f)", deviceId, time, value); + iotDBSessionConfig.getSessionPool().executeNonQueryStatement(sql); + } + + @GetMapping("/testSetRedis") + public void testSetRedis(@RequestParam("key") String key,@RequestParam("value") String value) { + redisTemplate.opsForValue().set(key,value); + } + + @GetMapping("/testGetCache") + public void testGetCache(@RequestParam("cacheName") String cacheName,@RequestParam("key") String key) { + Cache cache = cacheManager.getCache(cacheName); + if (cache != null) { + String v = cache.get(key,String.class); + log.info("缓存值为: {}",v); + }else { + log.info("无缓存"); + } + } + + @GetMapping("/textSetCache") + public void textSetCache( + @RequestParam("cacheName") String cacheName, + @RequestParam("key") String key, + @RequestParam("value") String value) { + Cache cache = cacheManager.getCache(cacheName); + if (cache != null){ + cache.put(key, value); + log.info("设置缓存成功"); + }else { + log.info("无缓存"); + } + } + + @GetMapping("/testDelCache") + public void testDelCache(@RequestParam("cacheName") String cacheName) { + if (!CacheNameEnums.isCode(cacheName)){ + log.info("缓存分区不存在"); + return; + } + Cache cache = cacheManager.getCache(cacheName); + if (cache != null) { + cache.invalidate(); + log.info("删除缓存成功"); + }else{ + log.info("无缓存"); + } + } + + + @GetMapping("/testAddCache") + public void testAddCache(@RequestParam("vin") String vin) { + ArrayList caches = new ArrayList<>(); + caches.add(new CaffeineCache(vin, Caffeine.newBuilder().recordStats().build())); + cacheManager.setCaches(caches); + log.info("缓存管理器创建新分区: {}", vin); + } + + @GetMapping("/testGetCacheNames") + public void testGetCacheNames() { + cacheManager.initializeCaches(); + log.info("缓存分区列表: {}", cacheManager.getCacheNames()); + } +} diff --git a/cloud-modules/cloud-modules-data-processing/src/main/java/com/muyu/data/processing/domain/BasicData.java b/cloud-modules/cloud-modules-data-processing/src/main/java/com/muyu/data/processing/domain/BasicData.java new file mode 100644 index 0000000..706b2cf --- /dev/null +++ b/cloud-modules/cloud-modules-data-processing/src/main/java/com/muyu/data/processing/domain/BasicData.java @@ -0,0 +1,36 @@ +package com.muyu.data.processing.domain; + +import lombok.*; + +import java.io.Serializable; + +/** + * 报文信息 时序实体类 + * + * @Author: 胡杨 + * @Name: DataProcessing + * @Description: 报文信息 时序实体类 + * @CreatedDate: 2024/9/28 下午3:48 + * @FilePath: com.muyu.data.processing.domain + */ + +@Data +@ToString +@Builder +@NoArgsConstructor +@AllArgsConstructor +public class BasicData implements Serializable { + + private String key; + private String label; + private String value; + private String type; + +// public void setValueClass() { +// Class info = ClassType.getInfo(type); +// if (info.isInstance(value)){ +// value = info.cast(value); +// } +// } +} + diff --git a/cloud-modules/cloud-modules-data-processing/src/main/java/com/muyu/data/processing/domain/CarData.java b/cloud-modules/cloud-modules-data-processing/src/main/java/com/muyu/data/processing/domain/CarData.java new file mode 100644 index 0000000..c2fdfb9 --- /dev/null +++ b/cloud-modules/cloud-modules-data-processing/src/main/java/com/muyu/data/processing/domain/CarData.java @@ -0,0 +1,25 @@ +package com.muyu.data.processing.domain; + +import lombok.*; + +/** + * 车辆信息 + * + * @Author: 胡杨 + * @Name: CarData + * @Description: 车辆信息 + * @CreatedDate: 2024/10/2 下午2:34 + * @FilePath: com.muyu.data.processing.domain + */ + +@Data +@Builder +@ToString +@NoArgsConstructor +@AllArgsConstructor +public class CarData { + private String vin; + private long timestamp; + private String latitude; + private String longitude; +} diff --git a/cloud-modules/cloud-modules-data-processing/src/main/java/com/muyu/data/processing/domain/IotDbData.java b/cloud-modules/cloud-modules-data-processing/src/main/java/com/muyu/data/processing/domain/IotDbData.java new file mode 100644 index 0000000..7353356 --- /dev/null +++ b/cloud-modules/cloud-modules-data-processing/src/main/java/com/muyu/data/processing/domain/IotDbData.java @@ -0,0 +1,35 @@ +package com.muyu.data.processing.domain; + +import com.muyu.common.core.web.domain.BaseEntity; +import lombok.*; +import lombok.experimental.SuperBuilder; + +import java.util.Date; + +/** + * 报文信息 时序实体类 + * + * @Author: 胡杨 + * @Name: DataProcessing + * @Description: 报文信息 时序实体类 + * @CreatedDate: 2024/9/28 下午3:48 + * @FilePath: com.muyu.data.processing.domain + */ + +@EqualsAndHashCode(callSuper = true) +@Data +@ToString +@SuperBuilder +@NoArgsConstructor +@AllArgsConstructor +public class IotDbData extends BaseEntity { + private long timestamp; + + private String vin; + + private String latitude; + private String longitude; + + +} + diff --git a/cloud-modules/cloud-modules-data-processing/src/main/java/com/muyu/data/processing/domain/Temporary2.java b/cloud-modules/cloud-modules-data-processing/src/main/java/com/muyu/data/processing/domain/Temporary2.java new file mode 100644 index 0000000..81a7534 --- /dev/null +++ b/cloud-modules/cloud-modules-data-processing/src/main/java/com/muyu/data/processing/domain/Temporary2.java @@ -0,0 +1,22 @@ +package com.muyu.data.processing.domain; + +import lombok.*; + +/** + * 临时类2 + * + * @Author: 胡杨 + * @Name: Temporary2 + * @Description: 临时类2 + * @CreatedDate: 2024/9/30 下午7:27 + * @FilePath: com.muyu.data.processing.domain + */ + +@Data +@ToString +@Builder +@NoArgsConstructor +@AllArgsConstructor +public class Temporary2 { + private String test; +} diff --git a/cloud-modules/cloud-modules-data-processing/src/main/java/com/muyu/data/processing/kafka/KafkaConsumerService.java b/cloud-modules/cloud-modules-data-processing/src/main/java/com/muyu/data/processing/kafka/KafkaConsumerService.java new file mode 100644 index 0000000..b4b2196 --- /dev/null +++ b/cloud-modules/cloud-modules-data-processing/src/main/java/com/muyu/data/processing/kafka/KafkaConsumerService.java @@ -0,0 +1,71 @@ +package com.muyu.data.processing.kafka; + + +import cn.hutool.core.thread.ThreadUtil; +import cn.hutool.json.JSONUtil; +import com.alibaba.nacos.shaded.com.google.common.collect.Lists; +import com.muyu.common.kafka.constants.KafkaConstants; +import com.muyu.data.processing.domain.BasicData; +import com.muyu.data.processing.service.DataProcessingService; +import com.muyu.data.processing.strategy.core.StartStrategy; +import jakarta.annotation.Resource; +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.springframework.beans.factory.InitializingBean; +import org.springframework.stereotype.Component; + +import java.time.Duration; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; + +/** + * kafka消费者 + * @Author: 胡杨 + * @Name: KafkaConsumerService + * @Description: kafka消费者 + * @CreatedDate: 2024/9/27 上午9:27 + * @FilePath: com.muyu.data.processing.kafka + */ + +@Slf4j +@Component +public class KafkaConsumerService implements InitializingBean { + @Resource + private KafkaConsumer kafkaConsumer; + @Resource + private StartStrategy startStrategy; + + @Override + public void afterPropertiesSet() throws Exception { + Thread thread = new Thread(() -> { + log.info("启动线程监听Topic: {}", KafkaConstants.KafkaTopic); + ThreadUtil.sleep(1000); + Collection topics = Lists.newArrayList(KafkaConstants.KafkaTopic); + kafkaConsumer.subscribe(topics); + while (true) { + System.out.println("开始消费数据,等待中..."); + ConsumerRecords consumerRecords = kafkaConsumer.poll(Duration.ofMillis(1000)); + for (ConsumerRecord consumerRecord : consumerRecords) { + //1.从ConsumerRecord中获取消费数据 + String originalMsg = (String) consumerRecord.value(); + log.info("从Kafka中消费的原始数据: " + originalMsg); + //2.把消费数据转换为DTO对象 + List dataList = JSONUtil.toList(originalMsg, BasicData.class); + log.info("从Kafka中消费的实体数据: " + dataList); + // 执行策略 + startStrategy.applyStrategy(getDataMap(dataList)); + } + } + }); + thread.start(); + } + + private HashMap getDataMap(List dataList) { + HashMap basicDataHashMap = new HashMap<>(); + dataList.forEach(data -> basicDataHashMap.put(data.getKey(), data)); + return basicDataHashMap; + } +} diff --git a/cloud-modules/cloud-modules-data-processing/src/main/java/com/muyu/data/processing/mapper/DataProcessingMapper.java b/cloud-modules/cloud-modules-data-processing/src/main/java/com/muyu/data/processing/mapper/DataProcessingMapper.java new file mode 100644 index 0000000..fdd1cfe --- /dev/null +++ b/cloud-modules/cloud-modules-data-processing/src/main/java/com/muyu/data/processing/mapper/DataProcessingMapper.java @@ -0,0 +1,35 @@ +package com.muyu.data.processing.mapper; + +import com.muyu.data.processing.domain.CarData; +import com.muyu.data.processing.domain.IotDbData; +import com.muyu.data.processing.domain.BasicData; +import org.apache.ibatis.annotations.Mapper; +import org.apache.ibatis.annotations.Param; +import org.springframework.stereotype.Repository; + +import java.util.List; + +/** + * 数据处理持久层 + * + * @Author: 胡杨 + * @Name: DataPeocessingMapper + * @Description: 数据处理持久层 + * @CreatedDate: 2024/9/28 下午3:47 + * @FilePath: com.muyu.data.processing.mapper + */ + +@Repository +@Mapper +public interface DataProcessingMapper{ + + List selectStorageGroup(); + + Integer insIotDbData(@Param("key") String key, @Param("value") String value); + + void strategyCheck(@Param("dataList") List dataList); + + Integer insIotDbDataVo(IotDbData build); + + List selectCarData(@Param("tableName") String tableName); +} diff --git a/cloud-modules/cloud-modules-data-processing/src/main/java/com/muyu/data/processing/rebbit/DownlineRabbitConsumer.java b/cloud-modules/cloud-modules-data-processing/src/main/java/com/muyu/data/processing/rebbit/DownlineRabbitConsumer.java new file mode 100644 index 0000000..c3a38b6 --- /dev/null +++ b/cloud-modules/cloud-modules-data-processing/src/main/java/com/muyu/data/processing/rebbit/DownlineRabbitConsumer.java @@ -0,0 +1,74 @@ +package com.muyu.data.processing.rebbit; + + +import com.muyu.common.caffeine.enums.CacheNameEnums; +import com.muyu.common.rabbit.constants.RabbitConstants; +import com.rabbitmq.client.Channel; +import jakarta.annotation.Resource; +import lombok.Setter; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.ObjectUtils; +import org.springframework.amqp.core.Message; +import org.springframework.amqp.rabbit.annotation.Queue; +import org.springframework.amqp.rabbit.annotation.RabbitListener; +import org.springframework.cache.Cache; +import org.springframework.cache.CacheManager; +import org.springframework.data.redis.core.RedisTemplate; +import org.springframework.stereotype.Component; + +import java.io.IOException; +import java.util.HashSet; + +/** + * 下线事件监听 + * @Author: 胡杨 + * @Name: DownlineRabbitConsumer + * @Description: 车辆下线监听器 + * @CreatedDate: 2024/9/26 下午8:21 + * @FilePath: com.muyu.data.processing.rebbit + */ +@Slf4j +@Component +@Setter +public class DownlineRabbitConsumer { + @Resource + private RedisTemplate redisTemplate; + @Resource + private CacheManager cacheManager; + + @RabbitListener(queuesToDeclare = {@Queue(RabbitConstants.DOWNLINE_QUEUE)}) + public void downline(String vin, Message message, Channel channel) { + log.info("车辆 {} 下线, 配置信息准备中。。。",vin); + try { + // 重复性校验 + Long add = redisTemplate.opsForSet().add(RabbitConstants.DOWNLINE_QUEUE, message.getMessageProperties().getMessageId()); + if (add>0) { + deleteCarCache(vin); + log.info("车辆 {} 下线, 消息已确认。。。",vin); + } else { + log.info("车辆 {} 下线, 消息重复消费,已确认。。。",vin); + } + channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); + log.info("车辆 {} 下线, 配置信息已准备完毕。。。",vin); + } catch (IOException e) { + try { + log.warn("车辆 {} 下线, 配置信息准备失败,返回队列,原因:{}", vin, e.getMessage()); + channel.basicAck(message.getMessageProperties().getDeliveryTag(), true); + } catch (IOException ex) { + log.warn("车辆 {} 下线, 消息返回队列失败,原因:{}", vin, ex.getMessage()); + } + } + } + + + /** + * 车辆下线 - 删除缓存 + */ + public void deleteCarCache(String vin) { + Cache cache = cacheManager.getCache(vin); + if (ObjectUtils.isNotEmpty(cache)){ + cache.invalidate(); + } + log.info("车辆编码:{},本地缓存删除完成...", vin); + } +} diff --git a/cloud-modules/cloud-modules-data-processing/src/main/java/com/muyu/data/processing/rebbit/GoOnlineRabbitConsumer.java b/cloud-modules/cloud-modules-data-processing/src/main/java/com/muyu/data/processing/rebbit/GoOnlineRabbitConsumer.java new file mode 100644 index 0000000..05bae21 --- /dev/null +++ b/cloud-modules/cloud-modules-data-processing/src/main/java/com/muyu/data/processing/rebbit/GoOnlineRabbitConsumer.java @@ -0,0 +1,74 @@ +package com.muyu.data.processing.rebbit; + + +import com.muyu.common.caffeine.enums.CacheNameEnums; +import com.muyu.common.rabbit.constants.RabbitConstants; +import com.rabbitmq.client.Channel; +import jakarta.annotation.Resource; +import lombok.Setter; +import lombok.extern.slf4j.Slf4j; +import org.springframework.amqp.core.Message; +import org.springframework.amqp.rabbit.annotation.Queue; +import org.springframework.amqp.rabbit.annotation.RabbitListener; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.cache.CacheManager; +import org.springframework.data.redis.core.RedisTemplate; +import org.springframework.stereotype.Component; + +import java.io.IOException; +import java.util.List; + +/** + * 上线事件监听 + * @Author: 胡杨 + * @Name: GoOnlineRabbitConsumer + * @Description: 上线事件 + * @CreatedDate: 2024/9/26 下午7:38 + * @FilePath: com.muyu.data.processing.rebbit + */ +@Slf4j +@Component +@Setter +public class GoOnlineRabbitConsumer { + @Resource + private RedisTemplate redisTemplate; + @Resource + private CacheManager cacheManager; + + @RabbitListener(queuesToDeclare = {@Queue(RabbitConstants.GO_ONLINE_QUEUE)}) + public void goOnline(String vin, Message message, Channel channel){ + log.info("车辆 {} 上线, 配置信息准备中。。。",vin); + try { + // 重复性校验 + Long add = redisTemplate.opsForSet().add(RabbitConstants.GO_ONLINE_QUEUE, message.getMessageProperties().getMessageId()); + if (add>0) { + addCarCache(vin); + log.info("车辆 {} 上线, 消息已确认。。。",vin); + } else { + log.info("车辆 {} 上线, 消息重复消费,已确认。。。",vin); + } + channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); + log.info("车辆 {} 上线, 配置信息已准备完毕。。。",vin); + } catch (IOException e) { + try { + log.warn("车辆 {} 上线, 配置信息准备失败,返回队列,原因:{}", vin, e.getMessage()); + channel.basicAck(message.getMessageProperties().getDeliveryTag(), true); + } catch (IOException ex) { + log.warn("车辆 {} 上线, 消息返回队列失败,原因:{}", vin, ex.getMessage()); + } + } + } + + /** + * 车辆上线 - 新增缓存 + */ + public void addCarCache(String vin) { + // 从Redis中获取缓存信息 + for (String name : CacheNameEnums.getCodes()) { + String value = redisTemplate.opsForValue().get(name+":"+vin); + cacheManager.getCache(name).put(vin, value); + log.info("存储缓存, 缓存分区:[{}], 车辆编码:[{}], 存储值:[{}]", name, vin, value); + } + log.info("车辆编码:{},本地缓存完成...",vin); + } +} diff --git a/cloud-modules/cloud-modules-data-processing/src/main/java/com/muyu/data/processing/service/DataProcessingService.java b/cloud-modules/cloud-modules-data-processing/src/main/java/com/muyu/data/processing/service/DataProcessingService.java new file mode 100644 index 0000000..67cacf3 --- /dev/null +++ b/cloud-modules/cloud-modules-data-processing/src/main/java/com/muyu/data/processing/service/DataProcessingService.java @@ -0,0 +1,36 @@ +package com.muyu.data.processing.service; + + +import com.muyu.data.processing.domain.BasicData; +import com.muyu.data.processing.domain.CarData; + +import java.util.HashMap; +import java.util.List; + +/** + * 数据处理业务层 + * + * @Author: 胡杨 + * @Name: DataProcessing + * @Description: 数据处理业务层 + * @CreatedDate: 2024/9/28 下午3:52 + * @FilePath: com.muyu.data.processing.server + */ + +public interface DataProcessingService{ + + /** + * 选择存储组 + * + * @return {@link List }<{@link String }> + */ + List selectStorageGroup(); + + void strategyCheck(List dataList); + + Integer insIotDbData(String key, String value); + + List selectCarData(String firmCode, String vin); + + Object addCarData(HashMap hashMap); +} diff --git a/cloud-modules/cloud-modules-data-processing/src/main/java/com/muyu/data/processing/service/impl/DataProcessingServiceImpl.java b/cloud-modules/cloud-modules-data-processing/src/main/java/com/muyu/data/processing/service/impl/DataProcessingServiceImpl.java new file mode 100644 index 0000000..8d824bb --- /dev/null +++ b/cloud-modules/cloud-modules-data-processing/src/main/java/com/muyu/data/processing/service/impl/DataProcessingServiceImpl.java @@ -0,0 +1,148 @@ +package com.muyu.data.processing.service.impl; + + +import javax.annotation.Resource; + +import com.muyu.common.iotdb.config.IotDBSessionConfig; +import com.muyu.data.processing.domain.CarData; +import com.muyu.data.processing.domain.IotDbData; +import com.muyu.data.processing.domain.BasicData; +import org.apache.iotdb.isession.SessionDataSet; +import org.apache.iotdb.isession.pool.SessionDataSetWrapper; +import org.apache.iotdb.rpc.IoTDBConnectionException; +import org.apache.iotdb.rpc.StatementExecutionException; +import org.apache.iotdb.session.pool.SessionPool; +import org.apache.iotdb.tsfile.read.common.Field; +import org.apache.iotdb.tsfile.read.common.RowRecord; +import org.springframework.stereotype.Service; +import lombok.extern.slf4j.Slf4j; +import com.muyu.data.processing.mapper.DataProcessingMapper; +import com.muyu.data.processing.service.DataProcessingService; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; + +/** + * 数据处理实现层 + * + * @Author: 胡杨 + * @Name: DataProcessing + * @Description: 数据处理实现层 + * @CreatedDate: 2024/9/28 下午3:52 + * @FilePath: com.muyu.data.processing.server.impl + */ + +@Slf4j +@Service +public class DataProcessingServiceImpl implements DataProcessingService { + @Resource + private DataProcessingMapper mapper; + @Resource + private SessionPool sessionPool; + + + @Override + public List selectStorageGroup() { + return mapper.selectStorageGroup(); + } + + @Override + public void strategyCheck(List dataList) { + HashMap kafkaDataHashMap = new HashMap<>(); + dataList.forEach(data -> kafkaDataHashMap.put(data.getKey(), data)); +// Result result = rootStrategy.applyStrategy(kafkaDataHashMap); +// String[] data = result.getData(); +// insIotDbData(data[0],data[1]); + IotDbData build = IotDbData.builder() + .vin(kafkaDataHashMap.get("vin").getValue()) + .timestamp(Long.parseLong(kafkaDataHashMap.get("timestamp").getValue())) + .latitude(kafkaDataHashMap.get("latitude").getValue()) + .longitude(kafkaDataHashMap.get("longitude").getValue()) + .build(); + mapper.insIotDbDataVo(build); +// dataList.forEach(KafkaData::setValueClass); +// mapper.strategyCheck(dataList); + } + + @Override + public Integer insIotDbData(String key, String value) { + return mapper.insIotDbData(key, value); + } + + @Override + public List selectCarData(String firmCode, String vin) { + ArrayList carDataList = new ArrayList<>(); + String sql = "select * from root.one."+firmCode+"."+vin; + try { + SessionDataSetWrapper dataSetWrapper = sessionPool.executeQueryStatement(sql); + List columnNames = dataSetWrapper.getColumnNames(); + while (dataSetWrapper.hasNext()){ + RowRecord next = dataSetWrapper.next(); + CarData data = getCarData(vin, next, columnNames); + carDataList.add(data); + } + } catch (Exception e) { + throw new RuntimeException(e); + } + return carDataList; + } + + @Override + public Object addCarData(HashMap hashMap) { +// StringBuilder sql = new StringBuilder("insert into root.one."); +// sql.append(hashMap.get("firmCode").getValue()) +// .append(".") +// .append(hashMap.get("vin").getValue()) +// .append("("); +// hashMap.remove("firmCode"); +// hashMap.remove("vin"); +// StringBuilder keys = new StringBuilder(); +// StringBuilder values = new StringBuilder(); +// hashMap.keySet().forEach(key -> { +// if (hashMap.get(key) != null) { +// keys.append(key).append(","); +// if ("String".equals(hashMap.get(key).getType())) { +// values.append("'") +// .append(hashMap.get(key).getValue()) +// .append("'") +// .append(","); +// }else { +// values.append(hashMap.get(key).getValue()) +// .append(","); +// } +// } +// }); +// sql.append(keys.substring(0, keys.length() - 1)) +// .append(") values (") +// .append(values.substring(0, values.length() - 1)) +// .append(")"); +// try { +// sessionPool.executeNonQueryStatement(sql.toString()); +// } catch (StatementExecutionException e) { +// throw new RuntimeException(e); +// } catch (IoTDBConnectionException e) { +// throw new RuntimeException(e); +// } +// log.info("成功执行sql语句: [{}]", sql); +// return sql; + return null; + } + + private static CarData getCarData(String vin, RowRecord next, List columnNames) { + List fields = next.getFields(); + CarData data = new CarData(); + data.setVin(vin); + data.setTimestamp(next.getTimestamp()); + for (int i = 0; i < columnNames.size(); i++) { + if (columnNames.get(i).contains("latitude")) { + data.setLatitude(fields.get(i-1).getStringValue()); + }else if (columnNames.get(i).contains("longitude")) { + data.setLongitude(fields.get(i-1).getStringValue()); + } + } + return data; + } + + +} diff --git a/cloud-modules/cloud-modules-data-processing/src/main/java/com/muyu/data/processing/strategy/StrategyHandler.java b/cloud-modules/cloud-modules-data-processing/src/main/java/com/muyu/data/processing/strategy/StrategyHandler.java new file mode 100644 index 0000000..ee5f3b7 --- /dev/null +++ b/cloud-modules/cloud-modules-data-processing/src/main/java/com/muyu/data/processing/strategy/StrategyHandler.java @@ -0,0 +1,24 @@ +package com.muyu.data.processing.strategy; + +import com.muyu.data.processing.strategy.core.EndStrategy; + +/** + * 策略控制者接口 + * @Author: 胡杨 + * @Name: StrategyHandler + * @Description: 策略控制者接口 + * @CreatedDate: 2024/9/28 上午9:35 + * @FilePath: com.muyu.data.processing.strategy + */ +public interface StrategyHandler { + + @SuppressWarnings("rawtypes") + StrategyHandler DEFAULT = param -> new EndStrategy(); + + /** + * 执行方法 + * @param t 入参 + * @return 返回结果 + */ + R apply(T t); +} diff --git a/cloud-modules/cloud-modules-data-processing/src/main/java/com/muyu/data/processing/strategy/abstractStrategyRouter.java b/cloud-modules/cloud-modules-data-processing/src/main/java/com/muyu/data/processing/strategy/abstractStrategyRouter.java new file mode 100644 index 0000000..9da9691 --- /dev/null +++ b/cloud-modules/cloud-modules-data-processing/src/main/java/com/muyu/data/processing/strategy/abstractStrategyRouter.java @@ -0,0 +1,60 @@ +package com.muyu.data.processing.strategy; + +import jakarta.annotation.PostConstruct; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Component; + +import java.util.Objects; +/** + * 抽象策略路由 + * @Author: 胡杨 + * @Name: abstractStrategyRouter + * @Description: 抽象策略路由 + * @CreatedDate: 2024/9/28 上午9:26 + * @FilePath: com.muyu.data.processing.strategy + */ +@Slf4j +@Component +public abstract class abstractStrategyRouter { + + /** + * 策略映射器, 指定入参与出参以决定策略处理者 + * @param 策略入参 + * @param 策略出参 + */ + public interface StrategyMapper{ + // 通过入参获取对应策略处理方法,使用Map实现 + StrategyHandler getHandler(T param); + } + + /** + * 选择下级策略 + * @return + */ + protected abstract StrategyMapper registerStrategy(); + + /** + * 默认策略处理者 + */ + @SuppressWarnings("unchecked") + private StrategyHandler defaultStrategyHandler = StrategyHandler.DEFAULT; + + + /** + * 选择策略处理者 + * @param param 入参 + * @return 策略处理结果 + */ + public R applyStrategy(T param) { + StrategyMapper trStrategyMapper = registerStrategy(); + if (trStrategyMapper == null) { + return defaultStrategyHandler.apply(param); + } + final StrategyHandler strategyHandler = trStrategyMapper.getHandler(param); + if (strategyHandler != null) { + return strategyHandler.apply(param); + } + // 使用默认策略处理者 + return defaultStrategyHandler.apply(param); + } +} diff --git a/cloud-modules/cloud-modules-data-processing/src/main/java/com/muyu/data/processing/strategy/branch/DataStorageProcessStrategy.java b/cloud-modules/cloud-modules-data-processing/src/main/java/com/muyu/data/processing/strategy/branch/DataStorageProcessStrategy.java new file mode 100644 index 0000000..47112da --- /dev/null +++ b/cloud-modules/cloud-modules-data-processing/src/main/java/com/muyu/data/processing/strategy/branch/DataStorageProcessStrategy.java @@ -0,0 +1,63 @@ +package com.muyu.data.processing.strategy.branch; + +import com.muyu.data.processing.domain.BasicData; +import java.util.HashMap; +import java.util.List; + +import com.muyu.data.processing.domain.Temporary2; +import com.muyu.data.processing.strategy.StrategyHandler; +import com.muyu.data.processing.strategy.abstractStrategyRouter; +import com.muyu.data.processing.strategy.core.EndStrategy; +import com.muyu.data.processing.strategy.leaves.DataStorageStrategy; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Component; + +/** + * 持久化数据处理 + * 数据持久化之前的数据调整 + * @Author: 胡杨 + * @Name: BasicStrategy + * @Description: 责任树 - 数据持久化数据处理 + * @CreatedDate: 2024/9/30 下午7:24 + * @FilePath: com.muyu.data.processing.strategy.branch + */ + +@Slf4j +@Component +public class DataStorageProcessStrategy extends abstractStrategyRouter, Temporary2> + implements StrategyHandler, Temporary2> { + // 必要参数 + private final static HashMap NECESSARY_PARAM = new HashMap<>(); + static { + NECESSARY_PARAM.put("VIN","VIN码"); + NECESSARY_PARAM.put("timestamp","时间戳"); + NECESSARY_PARAM.put("longitude","经度"); + NECESSARY_PARAM.put("latitude","纬度"); + } + + @Override + protected StrategyMapper, Temporary2> registerStrategy() { + return param-> { + // 判断是否存在问题 + if (param.containsKey("DataStorageProcessStrategy")) { + log.error("持久化流程错误,缺少必要参数: {}", param.get("DataStorageProcessStrategy").getKey()); + param.remove("DataStorageProcessStrategy"); + return new EndStrategy(); + } + log.info("持久化数据处理节点已通过。。。"); + return new DataStorageStrategy(); + }; + } + + @Override + public Temporary2 apply(HashMap basicDataMap) { + log.info("持久化数据处理节点开始处理。。。"); + // 判断是否缺少必要参数,如果有,记录 + NECESSARY_PARAM.keySet().forEach(key->{ + if (!basicDataMap.containsKey(key)) { + basicDataMap.put("DataStorageProcessStrategy", BasicData.builder().key(NECESSARY_PARAM.get(key)).build()); + } + }); + return applyStrategy(basicDataMap); + } +} diff --git a/cloud-modules/cloud-modules-data-processing/src/main/java/com/muyu/data/processing/strategy/branch/FaultJudgmentStrategy.java b/cloud-modules/cloud-modules-data-processing/src/main/java/com/muyu/data/processing/strategy/branch/FaultJudgmentStrategy.java new file mode 100644 index 0000000..25bbacd --- /dev/null +++ b/cloud-modules/cloud-modules-data-processing/src/main/java/com/muyu/data/processing/strategy/branch/FaultJudgmentStrategy.java @@ -0,0 +1,38 @@ +package com.muyu.data.processing.strategy.branch; + +import com.muyu.data.processing.domain.BasicData; +import java.util.HashMap; +import java.util.List; +import com.muyu.data.processing.domain.Temporary2; +import com.muyu.data.processing.strategy.StrategyHandler; +import com.muyu.data.processing.strategy.abstractStrategyRouter; +import com.muyu.data.processing.strategy.leaves.DataStorageStrategy; +import com.muyu.data.processing.strategy.leaves.FaultAlarmStrategy; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Component; + +/** + * 故障数据判断 + * 判断是否故障 + * @Author: 胡杨 + * @Name: BasicStrategy + * @Description: 责任树 - 故障数据判断 + * @CreatedDate: 2024/9/30 下午7:24 + * @FilePath: com.muyu.data.processing.strategy.branch + */ + +@Slf4j +@Component +public class FaultJudgmentStrategy extends abstractStrategyRouter, Temporary2> + implements StrategyHandler, Temporary2> { + @Override + protected StrategyMapper, Temporary2> registerStrategy() { + return param-> new FaultAlarmStrategy(); + } + + @Override + public Temporary2 apply(HashMap basicDataMap) { + log.info("故障判断节点已通过。。。"); + return applyStrategy(basicDataMap); + } +} diff --git a/cloud-modules/cloud-modules-data-processing/src/main/java/com/muyu/data/processing/strategy/branch/FaultProcessingStrategy.java b/cloud-modules/cloud-modules-data-processing/src/main/java/com/muyu/data/processing/strategy/branch/FaultProcessingStrategy.java new file mode 100644 index 0000000..405f2ca --- /dev/null +++ b/cloud-modules/cloud-modules-data-processing/src/main/java/com/muyu/data/processing/strategy/branch/FaultProcessingStrategy.java @@ -0,0 +1,37 @@ +package com.muyu.data.processing.strategy.branch; + +import com.muyu.data.processing.domain.BasicData; +import java.util.HashMap; +import java.util.List; +import com.muyu.data.processing.domain.Temporary2; +import com.muyu.data.processing.strategy.StrategyHandler; +import com.muyu.data.processing.strategy.abstractStrategyRouter; +import com.muyu.data.processing.strategy.leaves.DataStorageStrategy; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Component; + +/** + * 故障数据处理 + * 调整故障判断规则 + * @Author: 胡杨 + * @Name: FaultProcessingStrategy + * @Description: 故障参数处理 + * @CreatedDate: 2024/9/30 下午7:47 + * @FilePath: com.muyu.data.processing.strategy.branch + */ + +@Slf4j +@Component +public class FaultProcessingStrategy extends abstractStrategyRouter, Temporary2> + implements StrategyHandler, Temporary2> { + @Override + protected StrategyMapper, Temporary2> registerStrategy() { + return param-> new FaultJudgmentStrategy(); + } + + @Override + public Temporary2 apply(HashMap basicDataMap) { + log.info("故障数据处理节点已通过。。。"); + return applyStrategy(basicDataMap); + } +} diff --git a/cloud-modules/cloud-modules-data-processing/src/main/java/com/muyu/data/processing/strategy/branch/FenceJudgmentStrategy.java b/cloud-modules/cloud-modules-data-processing/src/main/java/com/muyu/data/processing/strategy/branch/FenceJudgmentStrategy.java new file mode 100644 index 0000000..dd73ce0 --- /dev/null +++ b/cloud-modules/cloud-modules-data-processing/src/main/java/com/muyu/data/processing/strategy/branch/FenceJudgmentStrategy.java @@ -0,0 +1,38 @@ +package com.muyu.data.processing.strategy.branch; + +import com.muyu.data.processing.domain.BasicData; +import java.util.HashMap; +import java.util.List; +import com.muyu.data.processing.domain.Temporary2; +import com.muyu.data.processing.strategy.StrategyHandler; +import com.muyu.data.processing.strategy.abstractStrategyRouter; +import com.muyu.data.processing.strategy.leaves.FaultAlarmStrategy; +import com.muyu.data.processing.strategy.leaves.FenceAlarmStrategy; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Component; + +/** + * 围栏数据判断 + * 判断是否围栏违规 + * @Author: 胡杨 + * @Name: BasicStrategy + * @Description: 责任树 - 围栏数据判断 + * @CreatedDate: 2024/9/30 下午7:24 + * @FilePath: com.muyu.data.processing.strategy.branch + */ + +@Slf4j +@Component +public class FenceJudgmentStrategy extends abstractStrategyRouter, Temporary2> + implements StrategyHandler, Temporary2> { + @Override + protected StrategyMapper, Temporary2> registerStrategy() { + return param-> new FenceAlarmStrategy(); + } + + @Override + public Temporary2 apply(HashMap basicDataMap) { + log.info("围栏数据判断节点通过。。。"); + return applyStrategy(basicDataMap); + } +} diff --git a/cloud-modules/cloud-modules-data-processing/src/main/java/com/muyu/data/processing/strategy/branch/FenceProcessingStrategy.java b/cloud-modules/cloud-modules-data-processing/src/main/java/com/muyu/data/processing/strategy/branch/FenceProcessingStrategy.java new file mode 100644 index 0000000..bb5d7f4 --- /dev/null +++ b/cloud-modules/cloud-modules-data-processing/src/main/java/com/muyu/data/processing/strategy/branch/FenceProcessingStrategy.java @@ -0,0 +1,36 @@ +package com.muyu.data.processing.strategy.branch; + +import com.muyu.data.processing.domain.BasicData; +import java.util.HashMap; +import java.util.List; +import com.muyu.data.processing.domain.Temporary2; +import com.muyu.data.processing.strategy.StrategyHandler; +import com.muyu.data.processing.strategy.abstractStrategyRouter; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Component; + +/** + * 围栏数据处理 + * 调整围栏判断规则 + * @Author: 胡杨 + * @Name: FaultProcessingStrategy + * @Description: 围栏参数处理 + * @CreatedDate: 2024/9/30 下午7:47 + * @FilePath: com.muyu.data.processing.strategy.branch + */ + +@Slf4j +@Component +public class FenceProcessingStrategy extends abstractStrategyRouter, Temporary2> + implements StrategyHandler, Temporary2> { + @Override + protected StrategyMapper, Temporary2> registerStrategy() { + return param-> new FenceJudgmentStrategy(); + } + + @Override + public Temporary2 apply(HashMap basicDataMap) { + log.info("围栏数据处理节点已通过。。。"); + return applyStrategy(basicDataMap); + } +} diff --git a/cloud-modules/cloud-modules-data-processing/src/main/java/com/muyu/data/processing/strategy/branch/RealTimeJudgmentStrategy.java b/cloud-modules/cloud-modules-data-processing/src/main/java/com/muyu/data/processing/strategy/branch/RealTimeJudgmentStrategy.java new file mode 100644 index 0000000..f908c0e --- /dev/null +++ b/cloud-modules/cloud-modules-data-processing/src/main/java/com/muyu/data/processing/strategy/branch/RealTimeJudgmentStrategy.java @@ -0,0 +1,38 @@ +package com.muyu.data.processing.strategy.branch; + +import com.muyu.data.processing.domain.BasicData; +import java.util.HashMap; +import java.util.List; +import com.muyu.data.processing.domain.Temporary2; +import com.muyu.data.processing.strategy.StrategyHandler; +import com.muyu.data.processing.strategy.abstractStrategyRouter; +import com.muyu.data.processing.strategy.leaves.RealTimeAlarmStrategy; +import com.muyu.data.processing.strategy.leaves.WarningAlarmStrategy; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Component; + +/** + * 实时数据判断 + * 判断实时数据情况 + * @Author: 胡杨 + * @Name: BasicStrategy + * @Description: 责任树 - 实时数据判断 + * @CreatedDate: 2024/9/30 下午7:24 + * @FilePath: com.muyu.data.processing.strategy.branch + */ + +@Slf4j +@Component +public class RealTimeJudgmentStrategy extends abstractStrategyRouter, Temporary2> + implements StrategyHandler, Temporary2> { + @Override + protected StrategyMapper, Temporary2> registerStrategy() { + return param-> new RealTimeAlarmStrategy(); + } + + @Override + public Temporary2 apply(HashMap basicDataMap) { + log.info("实时数据判断节点已通过。。。"); + return applyStrategy(basicDataMap); + } +} diff --git a/cloud-modules/cloud-modules-data-processing/src/main/java/com/muyu/data/processing/strategy/branch/RealTimeProcessingStrategy.java b/cloud-modules/cloud-modules-data-processing/src/main/java/com/muyu/data/processing/strategy/branch/RealTimeProcessingStrategy.java new file mode 100644 index 0000000..45ebf07 --- /dev/null +++ b/cloud-modules/cloud-modules-data-processing/src/main/java/com/muyu/data/processing/strategy/branch/RealTimeProcessingStrategy.java @@ -0,0 +1,36 @@ +package com.muyu.data.processing.strategy.branch; + +import com.muyu.data.processing.domain.BasicData; +import java.util.HashMap; +import java.util.List; +import com.muyu.data.processing.domain.Temporary2; +import com.muyu.data.processing.strategy.StrategyHandler; +import com.muyu.data.processing.strategy.abstractStrategyRouter; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Component; + +/** + * 实时数据处理 + * 调整实时数据 + * @Author: 胡杨 + * @Name: FaultProcessingStrategy + * @Description: 实时数据理 + * @CreatedDate: 2024/9/30 下午7:47 + * @FilePath: com.muyu.data.processing.strategy.branch + */ + +@Slf4j +@Component +public class RealTimeProcessingStrategy extends abstractStrategyRouter, Temporary2> + implements StrategyHandler, Temporary2> { + @Override + protected StrategyMapper, Temporary2> registerStrategy() { + return param-> new RealTimeJudgmentStrategy(); + } + + @Override + public Temporary2 apply(HashMap basicDataMap) { + log.info("实时数据处理节点已通过。。。"); + return applyStrategy(basicDataMap); + } +} diff --git a/cloud-modules/cloud-modules-data-processing/src/main/java/com/muyu/data/processing/strategy/branch/WarningJudgmentStrategy.java b/cloud-modules/cloud-modules-data-processing/src/main/java/com/muyu/data/processing/strategy/branch/WarningJudgmentStrategy.java new file mode 100644 index 0000000..58b6f0c --- /dev/null +++ b/cloud-modules/cloud-modules-data-processing/src/main/java/com/muyu/data/processing/strategy/branch/WarningJudgmentStrategy.java @@ -0,0 +1,38 @@ +package com.muyu.data.processing.strategy.branch; + +import com.muyu.data.processing.domain.BasicData; +import java.util.HashMap; +import java.util.List; +import com.muyu.data.processing.domain.Temporary2; +import com.muyu.data.processing.strategy.StrategyHandler; +import com.muyu.data.processing.strategy.abstractStrategyRouter; +import com.muyu.data.processing.strategy.leaves.FenceAlarmStrategy; +import com.muyu.data.processing.strategy.leaves.WarningAlarmStrategy; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Component; + +/** + * 预警数据判断 + * 判断预警策略 + * @Author: 胡杨 + * @Name: BasicStrategy + * @Description: 责任树 - 预警数据判断 + * @CreatedDate: 2024/9/30 下午7:24 + * @FilePath: com.muyu.data.processing.strategy.branch + */ + +@Slf4j +@Component +public class WarningJudgmentStrategy extends abstractStrategyRouter, Temporary2> + implements StrategyHandler, Temporary2> { + @Override + protected StrategyMapper, Temporary2> registerStrategy() { + return param-> new WarningAlarmStrategy(); + } + + @Override + public Temporary2 apply(HashMap basicDataMap) { + log.info("预警数据判断节点已通过。。。"); + return applyStrategy(basicDataMap); + } +} diff --git a/cloud-modules/cloud-modules-data-processing/src/main/java/com/muyu/data/processing/strategy/branch/WarningProcessingStrategy.java b/cloud-modules/cloud-modules-data-processing/src/main/java/com/muyu/data/processing/strategy/branch/WarningProcessingStrategy.java new file mode 100644 index 0000000..f068e9e --- /dev/null +++ b/cloud-modules/cloud-modules-data-processing/src/main/java/com/muyu/data/processing/strategy/branch/WarningProcessingStrategy.java @@ -0,0 +1,36 @@ +package com.muyu.data.processing.strategy.branch; + +import com.muyu.data.processing.domain.BasicData; +import java.util.HashMap; +import java.util.List; +import com.muyu.data.processing.domain.Temporary2; +import com.muyu.data.processing.strategy.StrategyHandler; +import com.muyu.data.processing.strategy.abstractStrategyRouter; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Component; + +/** + * 预警数据处理 + * 调整预警使用策略 + * @Author: 胡杨 + * @Name: FaultProcessingStrategy + * @Description: 预警数据处理 + * @CreatedDate: 2024/9/30 下午7:47 + * @FilePath: com.muyu.data.processing.strategy.branch + */ + +@Slf4j +@Component +public class WarningProcessingStrategy extends abstractStrategyRouter, Temporary2> + implements StrategyHandler, Temporary2> { + @Override + protected StrategyMapper, Temporary2> registerStrategy() { + return param-> new WarningJudgmentStrategy(); + } + + @Override + public Temporary2 apply(HashMap basicDataMap) { + log.info("预警数据处理节点已通过。。。"); + return applyStrategy(basicDataMap); + } +} diff --git a/cloud-modules/cloud-modules-data-processing/src/main/java/com/muyu/data/processing/strategy/core/BasicStrategy.java b/cloud-modules/cloud-modules-data-processing/src/main/java/com/muyu/data/processing/strategy/core/BasicStrategy.java new file mode 100644 index 0000000..d67112b --- /dev/null +++ b/cloud-modules/cloud-modules-data-processing/src/main/java/com/muyu/data/processing/strategy/core/BasicStrategy.java @@ -0,0 +1,41 @@ +package com.muyu.data.processing.strategy.core; + +import com.muyu.common.caffeine.enums.CacheNameEnums; +import com.muyu.data.processing.domain.BasicData; +import java.util.HashMap; +import java.util.List; +import com.muyu.data.processing.domain.Temporary2; +import com.muyu.data.processing.strategy.StrategyHandler; +import com.muyu.data.processing.strategy.abstractStrategyRouter; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Component; + +/** + * 基础校验节点 + * 负责基础校验 + * @Author: 胡杨 + * @Name: BasicStrategy + * @Description: 责任树 - 基础校验节点 + * @CreatedDate: 2024/9/30 下午7:24 + * @FilePath: com.muyu.data.processing.strategy.branch + */ + +@Slf4j +@Component +public class BasicStrategy extends abstractStrategyRouter, Temporary2> + implements StrategyHandler, Temporary2> { + @Override + protected StrategyMapper, Temporary2> registerStrategy() { + return param -> new RoutingStrategy(); + } + + @Override + public Temporary2 apply(HashMap basicDataMap) { + log.info("开始执行基础校验节点。。。"); + basicDataMap.put(CacheNameEnums.STORAGE.getCode(), null); + basicDataMap.put(CacheNameEnums.FAULT.getCode(), null); + basicDataMap.put(CacheNameEnums.REALTIME.getCode(), null); + log.info("基础校验节点已通过。。。"); + return applyStrategy(basicDataMap); + } +} diff --git a/cloud-modules/cloud-modules-data-processing/src/main/java/com/muyu/data/processing/strategy/core/EndStrategy.java b/cloud-modules/cloud-modules-data-processing/src/main/java/com/muyu/data/processing/strategy/core/EndStrategy.java new file mode 100644 index 0000000..410631a --- /dev/null +++ b/cloud-modules/cloud-modules-data-processing/src/main/java/com/muyu/data/processing/strategy/core/EndStrategy.java @@ -0,0 +1,29 @@ +package com.muyu.data.processing.strategy.core; + +import com.muyu.data.processing.domain.BasicData; +import java.util.HashMap; +import java.util.List; +import com.muyu.data.processing.domain.Temporary2; +import com.muyu.data.processing.strategy.StrategyHandler; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Component; + +/** + * 结束节点 + * + * @Author: 胡杨 + * @Name: EndStrategy + * @Description: 策略树 - 结束节点 + * @CreatedDate: 2024/9/30 下午7:13 + * @FilePath: com.muyu.data.processing.strategy.leaves + */ + +@Slf4j +@Component +public class EndStrategy implements StrategyHandler, Temporary2> { + @Override + public Temporary2 apply(HashMap basicDataMap) { + log.info("结束节点已通过。。。"); + return null; + } +} diff --git a/cloud-modules/cloud-modules-data-processing/src/main/java/com/muyu/data/processing/strategy/core/RoutingStrategy.java b/cloud-modules/cloud-modules-data-processing/src/main/java/com/muyu/data/processing/strategy/core/RoutingStrategy.java new file mode 100644 index 0000000..ce46f71 --- /dev/null +++ b/cloud-modules/cloud-modules-data-processing/src/main/java/com/muyu/data/processing/strategy/core/RoutingStrategy.java @@ -0,0 +1,59 @@ +package com.muyu.data.processing.strategy.core; + +import com.muyu.common.caffeine.enums.CacheNameEnums; +import com.muyu.data.processing.domain.BasicData; +import java.util.HashMap;import java.util.List; +import com.muyu.data.processing.domain.Temporary2; +import com.muyu.data.processing.strategy.StrategyHandler; +import com.muyu.data.processing.strategy.abstractStrategyRouter; +import com.muyu.data.processing.strategy.branch.*; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Component; + +/** + * 路由节点 + * 根据条件重新导向对应节点 + * @Author: 胡杨 + * @Name: RoutingStrategy + * @Description: 路由节点 + * @CreatedDate: 2024/9/30 下午7:37 + * @FilePath: com.muyu.data.processing.strategy.branch + */ + +@Slf4j +@Component +public class RoutingStrategy extends abstractStrategyRouter, Temporary2> + implements StrategyHandler, Temporary2> { + + private final static HashMap, Temporary2>> map = new HashMap<>(); + static{ + map.put(CacheNameEnums.WARMING.getCode(), new WarningProcessingStrategy()); + map.put(CacheNameEnums.REALTIME.getCode(), new RealTimeProcessingStrategy()); + map.put(CacheNameEnums.FENCE.getCode(), new FenceProcessingStrategy()); + map.put(CacheNameEnums.FAULT.getCode(), new FaultProcessingStrategy()); + map.put(CacheNameEnums.STORAGE.getCode(), new DataStorageProcessStrategy()); + } + + @Override + protected StrategyMapper, Temporary2> registerStrategy() { + return param -> { + // 编写路由规则 + List codes = CacheNameEnums.getCodes(); + for (String code : codes) { + if(param.containsKey(code)){ + param.remove(code); + return map.get(code); + } + } + // 默认返回结束节点 + return new EndStrategy(); + }; + }; + + + @Override + public Temporary2 apply(HashMap stringListHashMap) { + log.info("路由节点已通过。。。"); + return applyStrategy(stringListHashMap); + } +} diff --git a/cloud-modules/cloud-modules-data-processing/src/main/java/com/muyu/data/processing/strategy/core/StartStrategy.java b/cloud-modules/cloud-modules-data-processing/src/main/java/com/muyu/data/processing/strategy/core/StartStrategy.java new file mode 100644 index 0000000..c451af9 --- /dev/null +++ b/cloud-modules/cloud-modules-data-processing/src/main/java/com/muyu/data/processing/strategy/core/StartStrategy.java @@ -0,0 +1,47 @@ +package com.muyu.data.processing.strategy.core; + + + +import com.muyu.data.processing.domain.Temporary2; +import com.muyu.data.processing.strategy.abstractStrategyRouter; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Component; +import com.muyu.data.processing.domain.BasicData; +import java.util.HashMap; +import java.util.List; + +/** + * 开始节点 + * + * @Author: 胡杨 + * @Name: StartStrategy + * @Description: 策略路由实现 + * @CreatedDate: 2024/9/28 上午10:39 + * @FilePath: com.muyu.data.processing.strategy.impl + * 开始节点 + * ↓ + * 基础校验节点 + * ↓ + * 路由节点 ← ← ← ← ← ← + * ↙ ↙ ↓ ↘ ↘ + * 数据处理节点 预警处理节点 故障处理节点 围栏处理节点 实时数据数据处理节点 ↑ + * ↓ ↓ ↓ ↓ ↓ + * 数据持久化处理节点 预警处理节点 故障处理节点 围栏处理节点 实时数据数据处理节点 ↑ + * ↓ ↓ ↓ ↓ ↓ + * ↓ 预警通知节点 故障通知节点 围栏通知节点 实时数据处理节点 ↑ + * ↓ ↘ ↓ ↙ ↙ + * ↓ ↓ ↑ + * → → → 路由节点 → → → → → → + * ↓ + * 结束节点 + */ + +@Slf4j +@Component +public class StartStrategy extends abstractStrategyRouter, Temporary2> { + @Override + protected StrategyMapper, Temporary2> registerStrategy() { + return param -> new BasicStrategy(); + } + +} diff --git a/cloud-modules/cloud-modules-data-processing/src/main/java/com/muyu/data/processing/strategy/leaves/DataStorageStrategy.java b/cloud-modules/cloud-modules-data-processing/src/main/java/com/muyu/data/processing/strategy/leaves/DataStorageStrategy.java new file mode 100644 index 0000000..a57c426 --- /dev/null +++ b/cloud-modules/cloud-modules-data-processing/src/main/java/com/muyu/data/processing/strategy/leaves/DataStorageStrategy.java @@ -0,0 +1,90 @@ +package com.muyu.data.processing.strategy.leaves; + +import com.muyu.common.caffeine.enums.CacheNameEnums; +import com.muyu.common.iotdb.config.IotDBSessionConfig; +import com.muyu.data.processing.domain.BasicData; +import java.util.HashMap; +import java.util.List; +import com.muyu.data.processing.domain.Temporary2; +import com.muyu.data.processing.service.DataProcessingService; +import com.muyu.data.processing.service.impl.DataProcessingServiceImpl; +import com.muyu.data.processing.strategy.StrategyHandler; +import com.muyu.data.processing.strategy.abstractStrategyRouter; +import com.muyu.data.processing.strategy.core.RoutingStrategy; +import jakarta.annotation.Resource; +import lombok.Setter; +import lombok.extern.slf4j.Slf4j; +import org.apache.iotdb.rpc.IoTDBConnectionException; +import org.apache.iotdb.rpc.StatementExecutionException; +import org.apache.iotdb.session.pool.SessionPool; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +/** + * 数据持久化 + * 车辆数据进行持久化 + * @Author: 胡杨 + * @Name: BasicStrategy + * @Description: 责任树 - 数据持久化 + * @CreatedDate: 2024/9/30 下午7:24 + * @FilePath: com.muyu.data.processing.strategy.branch + */ + +@Slf4j +@Component +public class DataStorageStrategy extends abstractStrategyRouter, Temporary2> + implements StrategyHandler, Temporary2> { + + @Override + protected StrategyMapper, Temporary2> registerStrategy() { + return param -> { + log.info("数据持久化节点已通过。。。"); + return new RoutingStrategy(); + }; + } + + @Override + public Temporary2 apply(HashMap basicDataMap) { + // 执行持久化方法 + addCarData(basicDataMap); + return applyStrategy(basicDataMap); + } + + private void addCarData(HashMap hashMap) { + StringBuilder sql = new StringBuilder("insert into root.one."); + sql.append(hashMap.get("firmCode").getValue()) + .append(".") + .append(hashMap.get("VIN").getValue()) + .append("("); + hashMap.remove("firmCode"); + hashMap.remove("VIN"); + StringBuilder keys = new StringBuilder(); + StringBuilder values = new StringBuilder(); + hashMap.keySet().forEach(key -> { + if (hashMap.get(key) != null) { + keys.append(key).append(","); + if ("String".equals(hashMap.get(key).getType())) { + values.append("'") + .append(hashMap.get(key).getValue()) + .append("'") + .append(","); + }else { + values.append(hashMap.get(key).getValue()) + .append(","); + } + } + }); + sql.append(keys.substring(0, keys.length() - 1)) + .append(") values (") + .append(values.substring(0, values.length() - 1)) + .append(")"); + try { + new IotDBSessionConfig().getSessionPool().executeNonQueryStatement(sql.toString()); + } catch (StatementExecutionException e) { + throw new RuntimeException(e); + } catch (IoTDBConnectionException e) { + throw new RuntimeException(e); + } + log.info("成功执行sql语句: [{}]", sql); + } +} diff --git a/cloud-modules/cloud-modules-data-processing/src/main/java/com/muyu/data/processing/strategy/leaves/FaultAlarmStrategy.java b/cloud-modules/cloud-modules-data-processing/src/main/java/com/muyu/data/processing/strategy/leaves/FaultAlarmStrategy.java new file mode 100644 index 0000000..2ebad77 --- /dev/null +++ b/cloud-modules/cloud-modules-data-processing/src/main/java/com/muyu/data/processing/strategy/leaves/FaultAlarmStrategy.java @@ -0,0 +1,38 @@ +package com.muyu.data.processing.strategy.leaves; + +import com.muyu.common.caffeine.enums.CacheNameEnums; +import com.muyu.data.processing.domain.BasicData; +import java.util.HashMap; +import java.util.List; +import com.muyu.data.processing.domain.Temporary2; +import com.muyu.data.processing.strategy.StrategyHandler; +import com.muyu.data.processing.strategy.abstractStrategyRouter; +import com.muyu.data.processing.strategy.core.RoutingStrategy; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Component; + +/** + * 故障报警 + * 故障数据记录并报警 + * @Author: 胡杨 + * @Name: BasicStrategy + * @Description: 责任树 - 故障报警 + * @CreatedDate: 2024/9/30 下午7:24 + * @FilePath: com.muyu.data.processing.strategy.branch + */ + +@Slf4j +@Component +public class FaultAlarmStrategy extends abstractStrategyRouter, Temporary2> + implements StrategyHandler, Temporary2> { + @Override + protected StrategyMapper, Temporary2> registerStrategy() { + return param -> new RoutingStrategy(); + } + + @Override + public Temporary2 apply(HashMap basicDataMap) { + log.info("故障报警节点已通过。。。"); + return applyStrategy(basicDataMap); + } +} diff --git a/cloud-modules/cloud-modules-data-processing/src/main/java/com/muyu/data/processing/strategy/leaves/FenceAlarmStrategy.java b/cloud-modules/cloud-modules-data-processing/src/main/java/com/muyu/data/processing/strategy/leaves/FenceAlarmStrategy.java new file mode 100644 index 0000000..6845011 --- /dev/null +++ b/cloud-modules/cloud-modules-data-processing/src/main/java/com/muyu/data/processing/strategy/leaves/FenceAlarmStrategy.java @@ -0,0 +1,38 @@ +package com.muyu.data.processing.strategy.leaves; + +import com.muyu.common.caffeine.enums.CacheNameEnums; +import com.muyu.data.processing.domain.BasicData; +import java.util.HashMap; +import java.util.List; +import com.muyu.data.processing.domain.Temporary2; +import com.muyu.data.processing.strategy.StrategyHandler; +import com.muyu.data.processing.strategy.abstractStrategyRouter; +import com.muyu.data.processing.strategy.core.RoutingStrategy; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Component; + +/** + * 围栏报警 + * 围栏数据记录并报警 + * @Author: 胡杨 + * @Name: BasicStrategy + * @Description: 责任树 - 围栏报警 + * @CreatedDate: 2024/9/30 下午7:24 + * @FilePath: com.muyu.data.processing.strategy.branch + */ + +@Slf4j +@Component +public class FenceAlarmStrategy extends abstractStrategyRouter, Temporary2> + implements StrategyHandler, Temporary2> { + @Override + protected StrategyMapper, Temporary2> registerStrategy() { + return param -> new RoutingStrategy(); + } + + @Override + public Temporary2 apply(HashMap basicDataMap) { + log.info("围栏报警节点已通过。。。"); + return applyStrategy(basicDataMap); + } +} diff --git a/cloud-modules/cloud-modules-data-processing/src/main/java/com/muyu/data/processing/strategy/leaves/RealTimeAlarmStrategy.java b/cloud-modules/cloud-modules-data-processing/src/main/java/com/muyu/data/processing/strategy/leaves/RealTimeAlarmStrategy.java new file mode 100644 index 0000000..0a4cbc3 --- /dev/null +++ b/cloud-modules/cloud-modules-data-processing/src/main/java/com/muyu/data/processing/strategy/leaves/RealTimeAlarmStrategy.java @@ -0,0 +1,38 @@ +package com.muyu.data.processing.strategy.leaves; + +import com.muyu.common.caffeine.enums.CacheNameEnums; +import com.muyu.data.processing.domain.BasicData; +import java.util.HashMap; +import java.util.List; +import com.muyu.data.processing.domain.Temporary2; +import com.muyu.data.processing.strategy.StrategyHandler; +import com.muyu.data.processing.strategy.abstractStrategyRouter; +import com.muyu.data.processing.strategy.core.RoutingStrategy; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Component; + +/** + * 实时数据 + * 处理实时数据事件 + * @Author: 胡杨 + * @Name: BasicStrategy + * @Description: 责任树 - 实时数据 + * @CreatedDate: 2024/9/30 下午7:24 + * @FilePath: com.muyu.data.processing.strategy.branch + */ + +@Slf4j +@Component +public class RealTimeAlarmStrategy extends abstractStrategyRouter, Temporary2> + implements StrategyHandler, Temporary2> { + @Override + protected StrategyMapper, Temporary2> registerStrategy() { + return param -> new RoutingStrategy(); + } + + @Override + public Temporary2 apply(HashMap basicDataMap) { + log.info("实时数据处理节点已通过。。。"); + return applyStrategy(basicDataMap); + } +} diff --git a/cloud-modules/cloud-modules-data-processing/src/main/java/com/muyu/data/processing/strategy/leaves/WarningAlarmStrategy.java b/cloud-modules/cloud-modules-data-processing/src/main/java/com/muyu/data/processing/strategy/leaves/WarningAlarmStrategy.java new file mode 100644 index 0000000..8026940 --- /dev/null +++ b/cloud-modules/cloud-modules-data-processing/src/main/java/com/muyu/data/processing/strategy/leaves/WarningAlarmStrategy.java @@ -0,0 +1,38 @@ +package com.muyu.data.processing.strategy.leaves; + +import com.muyu.common.caffeine.enums.CacheNameEnums; +import com.muyu.data.processing.domain.BasicData; +import java.util.HashMap; +import java.util.List; +import com.muyu.data.processing.domain.Temporary2; +import com.muyu.data.processing.strategy.StrategyHandler; +import com.muyu.data.processing.strategy.abstractStrategyRouter; +import com.muyu.data.processing.strategy.core.RoutingStrategy; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Component; + +/** + * 预警报警 + * 预警数据记录并提示 + * @Author: 胡杨 + * @Name: BasicStrategy + * @Description: 责任树 - 预警提示 + * @CreatedDate: 2024/9/30 下午7:24 + * @FilePath: com.muyu.data.processing.strategy.branch + */ + +@Slf4j +@Component +public class WarningAlarmStrategy extends abstractStrategyRouter, Temporary2> + implements StrategyHandler, Temporary2> { + @Override + protected StrategyMapper, Temporary2> registerStrategy() { + return param -> new RoutingStrategy(); + } + + @Override + public Temporary2 apply(HashMap basicDataMap) { + log.info("预警报警节点已通过。。。"); + return applyStrategy(basicDataMap); + } +} diff --git a/cloud-modules/cloud-modules-data-processing/src/main/java/com/muyu/data/processing/utils/DataUtils.java b/cloud-modules/cloud-modules-data-processing/src/main/java/com/muyu/data/processing/utils/DataUtils.java new file mode 100644 index 0000000..c6e96cf --- /dev/null +++ b/cloud-modules/cloud-modules-data-processing/src/main/java/com/muyu/data/processing/utils/DataUtils.java @@ -0,0 +1,28 @@ +package com.muyu.data.processing.utils; + +import org.springframework.stereotype.Component; + +/** + * 数据处理工具类 + * + * @Author: 胡杨 + * @Name: DataUtils + * @Description: 数据处理工具类 + * @CreatedDate: 2024/9/29 上午10:15 + * @FilePath: com.muyu.data.processing.utils + */ + +@Component +public class DataUtils { + + /** + * 类型转换 + * @param data 转换值 + * @param type 转换类型 + * @return 转换结果 + * @param 返回类型 + */ + public static T convert(Object data, Class type) { + return type.cast(data); + } +} diff --git a/cloud-modules/cloud-modules-data-processing/src/main/resources/banner.txt b/cloud-modules/cloud-modules-data-processing/src/main/resources/banner.txt new file mode 100644 index 0000000..0dd5eee --- /dev/null +++ b/cloud-modules/cloud-modules-data-processing/src/main/resources/banner.txt @@ -0,0 +1,2 @@ +Spring Boot Version: ${spring-boot.version} +Spring Application Name: ${spring.application.name} diff --git a/cloud-modules/cloud-modules-data-processing/src/main/resources/bootstrap.yml b/cloud-modules/cloud-modules-data-processing/src/main/resources/bootstrap.yml new file mode 100644 index 0000000..b40171b --- /dev/null +++ b/cloud-modules/cloud-modules-data-processing/src/main/resources/bootstrap.yml @@ -0,0 +1,72 @@ +# Tomcat +server: + port: 9711 + +# nacos线上地址 +nacos: + addr: 47.116.173.119:8848 + user-name: nacos + password: nacos + namespace: one-saas +# SPRING_AMQP_DESERIALIZATION_TRUST_ALL=true spring.amqp.deserialization.trust.all +# Spring +spring: + iotdb: + ip: 47.116.173.119 + port: 6667 + user: root + password: root + fetchSize: 10000 + maxActive: 10 + amqp: + deserialization: + trust: + all: true + main: + allow-bean-definition-overriding: true + application: + # 应用名称 + name: cloud-data-processing + profiles: + # 环境配置 + active: dev + cloud: + nacos: + discovery: + # 服务注册地址 + server-addr: ${nacos.addr} + # nacos用户名 + username: ${nacos.user-name} + # nacos密码 + password: ${nacos.password} + # 命名空间 + namespace: ${nacos.namespace} + config: + # 服务注册地址 + server-addr: ${nacos.addr} + # nacos用户名 + username: ${nacos.user-name} + # nacos密码 + password: ${nacos.password} + # 命名空间 + namespace: ${nacos.namespace} + # 配置文件格式 + file-extension: yml + # 共享配置 + shared-configs: + # 系统共享配置 + - application-${spring.profiles.active}.${spring.cloud.nacos.config.file-extension} + # 系统环境Config共享配置 + - application-config-${spring.profiles.active}.${spring.cloud.nacos.config.file-extension} + # xxl-job 配置文件 + - application-xxl-config-${spring.profiles.active}.${spring.cloud.nacos.config.file-extension} + # rabbit 配置文件 + - application-rabbit-config-${spring.profiles.active}.${spring.cloud.nacos.config.file-extension} + # kafka 配置文件 + - application-kafka-config-${spring.profiles.active}.${spring.cloud.nacos.config.file-extension} + +logging: + level: + com.muyu.system.mapper: DEBUG + +cacheNames: fault,fence,warming diff --git a/cloud-modules/cloud-modules-data-processing/src/main/resources/logback/dev.xml b/cloud-modules/cloud-modules-data-processing/src/main/resources/logback/dev.xml new file mode 100644 index 0000000..8af8bf1 --- /dev/null +++ b/cloud-modules/cloud-modules-data-processing/src/main/resources/logback/dev.xml @@ -0,0 +1,74 @@ + + + + + + + + + + + ${log.pattern} + + + + + + ${log.path}/info.log + + + + ${log.path}/info.%d{yyyy-MM-dd}.log + + 60 + + + ${log.pattern} + + + + INFO + + ACCEPT + + DENY + + + + + ${log.path}/error.log + + + + ${log.path}/error.%d{yyyy-MM-dd}.log + + 60 + + + ${log.pattern} + + + + ERROR + + ACCEPT + + DENY + + + + + + + + + + + + + + + + + + diff --git a/cloud-modules/cloud-modules-data-processing/src/main/resources/logback/prod.xml b/cloud-modules/cloud-modules-data-processing/src/main/resources/logback/prod.xml new file mode 100644 index 0000000..35744e3 --- /dev/null +++ b/cloud-modules/cloud-modules-data-processing/src/main/resources/logback/prod.xml @@ -0,0 +1,81 @@ + + + + + + + + + + + + ${log.sky.pattern} + + + + + + ${log.path}/info.log + + + + ${log.path}/info.%d{yyyy-MM-dd}.log + + 60 + + + + + INFO + + ACCEPT + + DENY + + + + + ${log.path}/error.log + + + + ${log.path}/error.%d{yyyy-MM-dd}.log + + 60 + + + + + ERROR + + ACCEPT + + DENY + + + + + + + + ${log.sky.pattern} + + + + + + + + + + + + + + + + + + + + diff --git a/cloud-modules/cloud-modules-data-processing/src/main/resources/logback/test.xml b/cloud-modules/cloud-modules-data-processing/src/main/resources/logback/test.xml new file mode 100644 index 0000000..35744e3 --- /dev/null +++ b/cloud-modules/cloud-modules-data-processing/src/main/resources/logback/test.xml @@ -0,0 +1,81 @@ + + + + + + + + + + + + ${log.sky.pattern} + + + + + + ${log.path}/info.log + + + + ${log.path}/info.%d{yyyy-MM-dd}.log + + 60 + + + + + INFO + + ACCEPT + + DENY + + + + + ${log.path}/error.log + + + + ${log.path}/error.%d{yyyy-MM-dd}.log + + 60 + + + + + ERROR + + ACCEPT + + DENY + + + + + + + + ${log.sky.pattern} + + + + + + + + + + + + + + + + + + + + diff --git a/cloud-modules/cloud-modules-data-processing/src/main/resources/mapper/processing/DataProcessingMapper.xml b/cloud-modules/cloud-modules-data-processing/src/main/resources/mapper/processing/DataProcessingMapper.xml new file mode 100644 index 0000000..7766db8 --- /dev/null +++ b/cloud-modules/cloud-modules-data-processing/src/main/resources/mapper/processing/DataProcessingMapper.xml @@ -0,0 +1,38 @@ + + + + + + + + + + insert into root.one.data(${key}) values(${value}); + + + insert into root.one.data + ( + + ${data.key} + + ) values + ( + + #{data.value} + + ) + + + + insert into + root.one.data + (timestamp, vin, latitude,longitude) + values (#{timestamp}, #{vin}, #{latitude}, #{longitude}) + + + + diff --git a/cloud-modules/pom.xml b/cloud-modules/pom.xml index 34653b3..8abfb62 100644 --- a/cloud-modules/pom.xml +++ b/cloud-modules/pom.xml @@ -13,6 +13,7 @@ cloud-modules-gen cloud-modules-file cloud-modules-openbusiness + cloud-modules-data-processing cloud-modules diff --git a/pom.xml b/pom.xml index f4a514b..6171f59 100644 --- a/pom.xml +++ b/pom.xml @@ -293,7 +293,7 @@ com.muyu - cloud-data-processing + cloud-modules-data-processing ${muyu.version}