diff --git a/cloud-common/cloud-common-core/src/main/java/com/muyu/common/core/enums/ClassType.java b/cloud-common/cloud-common-core/src/main/java/com/muyu/common/core/enums/ClassType.java new file mode 100644 index 0000000..69db8d3 --- /dev/null +++ b/cloud-common/cloud-common-core/src/main/java/com/muyu/common/core/enums/ClassType.java @@ -0,0 +1,63 @@ +package com.muyu.common.core.enums; + +import lombok.Getter; + +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.Set; + +/** + * 类型枚举 + * + * @Author: 胡杨 + * @Name: ClassType + * @Description: 类型枚举 + * @CreatedDate: 2024/9/29 上午9:28 + * @FilePath: com.muyu.common.core.enums + */ + +@Getter +public enum ClassType { + BYTE("byte", byte.class), + SHORT("short", short.class), + INT("int", int.class), + LONG("long", long.class), + FLOAT("float", float.class), + DOUBLE("double", double.class), + BOOLEAN("boolean", boolean.class), + CHAR("char", char.class), + STRING("String", String.class), + SET("Set", Set.class), + MAP("Map", Map.class), + LIST("List", List.class); + + private final String code; + private final Class info; + + ClassType(String code, Class info) { + this.code = code; + this.info = info; + } + + /** + * 鉴别参数是否是枚举的值 + * + * @param code 需鉴别参数 + * @return 如果存在返回结果turn, 否则返回false + */ + public static boolean isCode(String code) { + return Arrays.stream(values()) + .map(ClassType::getCode) + .anyMatch(c -> c.equals(code)); + } + + + public static Class getInfo(String code) { + return Arrays.stream(values()) + .filter(c -> c.getCode().equals(code)) + .findFirst() + .map(ClassType::getInfo) + .orElse(null); + } +} diff --git a/cloud-common/cloud-common-kafka/src/main/java/com/muyu/common/kafka/constants/KafkaConstants.java b/cloud-common/cloud-common-kafka/src/main/java/com/muyu/common/kafka/constants/KafkaConstants.java index d4c3d13..071a7c5 100644 --- a/cloud-common/cloud-common-kafka/src/main/java/com/muyu/common/kafka/constants/KafkaConstants.java +++ b/cloud-common/cloud-common-kafka/src/main/java/com/muyu/common/kafka/constants/KafkaConstants.java @@ -8,7 +8,7 @@ package com.muyu.common.kafka.constants; */ public class KafkaConstants { - public final static String KafkaTopic = "kafka_topic"; + public final static String KafkaTopic = "kafka_topic2"; - public final static String KafkaGrop = "kafka_grop"; + public final static String KafkaGrop = "kafka_grop2"; } diff --git a/cloud-data-processing/src/main/java/com/muyu/data/processing/MyDataApplication.java b/cloud-data-processing/src/main/java/com/muyu/data/processing/MyDataApplication.java index e62b6c0..a527785 100644 --- a/cloud-data-processing/src/main/java/com/muyu/data/processing/MyDataApplication.java +++ b/cloud-data-processing/src/main/java/com/muyu/data/processing/MyDataApplication.java @@ -4,11 +4,13 @@ import com.muyu.common.kafka.constants.KafkaConstants; import com.muyu.common.security.annotation.EnableCustomConfig; import com.muyu.common.security.annotation.EnableMyFeignClients; +import jakarta.annotation.PostConstruct; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration; /** + * 数据处理模块启动器 * @Author: 胡杨 * @Name: MyData * @Description: 数据处理模块启动器 diff --git a/cloud-data-processing/src/main/java/com/muyu/data/processing/controller/DataProcessingController.java b/cloud-data-processing/src/main/java/com/muyu/data/processing/controller/DataProcessingController.java index 7909d7c..657d84a 100644 --- a/cloud-data-processing/src/main/java/com/muyu/data/processing/controller/DataProcessingController.java +++ b/cloud-data-processing/src/main/java/com/muyu/data/processing/controller/DataProcessingController.java @@ -29,112 +29,6 @@ public class DataProcessingController { @Resource private DataProcessingService service; - - - @RequestMapping(value = "/createCarData", method = RequestMethod.POST) -// @Transactional(rollbackFor = Exception.class) - public Result createCarData(@RequestBody IotDbData data) { - try { - data.setTimestamp(System.currentTimeMillis()); - data.setCreateTime(new Date()); - Integer v = service.createCarData(data); - if (v == -1) { - return Result.success(v); - } else { - return Result.error(false); - } - } catch (Exception e) { - log.info("创建车辆报文记录失败!" + e); - return Result.error(false); - } - } - - - /** - * 更新操作 其实也是插入操作 时间戳相同 只和时间戳相关 - * @param data - * @return - */ - @RequestMapping(value = "/updateCarData", method = RequestMethod.POST) - public Result updateCarData(@RequestBody IotDbData data) { - try { - data.setTimestamp(System.currentTimeMillis()); - data.setCreateTime(new Date()); - Integer v = service.updateCarData(data); - if (v == -1) { - return Result.success(v); - } else { - return Result.error(false); - } - } catch (Exception e) { - log.info("更新车辆报文记录失败!" + e); - return Result.error(false); - } - } - - - /** - * 删除操作 要将时间戳的加号变成%2B - * @param timestamp - * @return - */ - @RequestMapping(value = "/deleteCarData", method = RequestMethod.GET) - public Result deleteCarData(String timestamp) { - try { - Integer v = service.deleteCarData(timestamp); - if (v == -1) { - return Result.success(v); - } else { - return Result.error(false); - } - } catch (Exception e) { - log.info("删除车辆报文记录失败!" + e); - return Result.error(false); - } - } - - - /** - * 创建组 也就是相当于数据库 - * @return - */ - @RequestMapping(value = "/createCarDataGroup", method = RequestMethod.POST) - public Result createCarDataGroup() { - try { - Integer v = service.createCarDataGroup(); - if (v > 0) { - return Result.success(v); - } else { - return Result.error(false); - } - } catch (Exception e) { - log.info("创建车辆报文记录组失败!" + e); - return Result.error(false); - } - } - - /** - * 查询所有车辆报文记录数据 - * @return - */ - @RequestMapping(value = "/queryCarData", method = RequestMethod.GET) - public Result queryCarData() { - try { - List v = service.queryCarData(); - if (v.size() > 0) { - v.forEach(x -> { - System.out.println(x.toString()); - }); - return Result.success(v); - } else { - return Result.error(false); - } - } catch (Exception e) { - log.info("查询车辆报文记录组失败!" + e); - return Result.error(false); - } - } - /** * 查看数据库有多少组 * @@ -142,19 +36,14 @@ public class DataProcessingController { */ @RequestMapping(value = "/selectStorageGroup", method = RequestMethod.GET) public Result selectStorageGroup() { - try { - List v = service.selectStorageGroup(); - if (v.size() > 0) { - v.forEach(x -> { - System.out.println("group------------------" + x.toString()); - }); - return Result.success(v); - } else { - return Result.error(false); - } - } catch (Exception e) { - log.info("查询组失败!" + e); + List v = service.selectStorageGroup();if (v.size() > 0) {v.forEach(x -> { + System.out.println("group------------------" + x.toString()); + }); + return Result.success(v); + } else { return Result.error(false); } } + + } diff --git a/cloud-data-processing/src/main/java/com/muyu/data/processing/controller/TestController.java b/cloud-data-processing/src/main/java/com/muyu/data/processing/controller/TestController.java index 48602df..cffd65d 100644 --- a/cloud-data-processing/src/main/java/com/muyu/data/processing/controller/TestController.java +++ b/cloud-data-processing/src/main/java/com/muyu/data/processing/controller/TestController.java @@ -1,16 +1,13 @@ package com.muyu.data.processing.controller; -import com.alibaba.fastjson.JSONObject; import com.muyu.common.kafka.constants.KafkaConstants; -import com.muyu.data.processing.domain.IotDbData; import com.muyu.data.processing.domain.req.TestReq; import com.muyu.data.processing.domain.resp.TestResp; import com.muyu.data.processing.strategy.root.RootStrategy; import jakarta.annotation.Resource; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; -import org.springframework.transaction.annotation.Transactional; import org.springframework.web.bind.annotation.*; import lombok.extern.slf4j.Slf4j; @@ -32,30 +29,44 @@ public class TestController { @GetMapping("/testKafka") public void sendMsg(@RequestParam("msg") String msg) { try { - IotDbData iotDbData = IotDbData.builder() - .timestamp(System.currentTimeMillis()) - .vin("vin666") - .key("test") - .label("测试数据") - .value("Kafka测试") - .type("String") - .build(); - String jsonString = JSONObject.toJSONString(iotDbData); + // 测试数据 + String jsonString = """ + [{ + "key": "vin", + "label": "VIN码", + "type": "String", + "value": "vin131413534474" + },{ + "key": "timestamp", + "label": "时间戳", + "type": "long", + "value": 1727534036893L + },{ + "key": "latitude", + "label": "纬度", + "type": "float", + "value": 66.898F + },{ + "key": "longitude", + "label": "经度", + "type": "float", + "value": 99.124F + }]"""; ProducerRecord producerRecord = new ProducerRecord<>(KafkaConstants.KafkaTopic, jsonString); kafkaProducer.send(producerRecord); - System.out.println("同步消息发送成功: " + msg); + System.out.println("同步消息发送成功: " + jsonString); } catch (Exception e) { e.printStackTrace(); - System.out.println("同步消息发送失败: " + msg); + System.out.println("同步消息发送失败"); } } - @Resource - private RootStrategy rootStrategy; - - @PostMapping("/testStrategy") - public TestResp testStrategy(@RequestBody TestReq testReq) { - return rootStrategy.applyStrategy(testReq); - } +// @Resource +// private RootStrategy rootStrategy; +// +// @PostMapping("/testStrategy") +// public TestResp testStrategy(@RequestBody TestReq testReq) { +// return rootStrategy.applyStrategy(testReq); +// } } diff --git a/cloud-data-processing/src/main/java/com/muyu/data/processing/domain/KafkaData.java b/cloud-data-processing/src/main/java/com/muyu/data/processing/domain/KafkaData.java new file mode 100644 index 0000000..d2eb581 --- /dev/null +++ b/cloud-data-processing/src/main/java/com/muyu/data/processing/domain/KafkaData.java @@ -0,0 +1,53 @@ +package com.muyu.data.processing.domain; + +import com.muyu.common.core.enums.ClassType; +import lombok.*; +import org.apache.commons.lang3.ObjectUtils; +import org.apache.commons.lang3.StringUtils; + +import java.io.Serializable; + +/** + * 报文信息 时序实体类 + * + * @Author: 胡杨 + * @Name: DataProcessing + * @Description: 报文信息 时序实体类 + * @CreatedDate: 2024/9/28 下午3:48 + * @FilePath: com.muyu.data.processing.domain + */ + +@Data +@ToString +@Builder +@NoArgsConstructor +@AllArgsConstructor +public class KafkaData implements Serializable { + + private String key; + private String label; + private Object value; + private String type; + + public void setType(String type) { + this.type = type; + if (StringUtils.isNotEmpty(this.type) && ObjectUtils.isNotEmpty(this.value)){ + setValueClass(); + } + } + + public void setValue(Object value) { + this.value = value; + if (StringUtils.isNotEmpty(this.type) && ObjectUtils.isNotEmpty(this.value)){ + setValueClass(); + } + } + + public void setValueClass() { + Class info = ClassType.getInfo(type); + if (info.isInstance(value)){ + value = info.cast(value); + } + } +} + diff --git a/cloud-data-processing/src/main/java/com/muyu/data/processing/domain/StrategyEums.java b/cloud-data-processing/src/main/java/com/muyu/data/processing/domain/StrategyEums.java deleted file mode 100644 index b018e6c..0000000 --- a/cloud-data-processing/src/main/java/com/muyu/data/processing/domain/StrategyEums.java +++ /dev/null @@ -1,62 +0,0 @@ -package com.muyu.data.processing.domain; - -import com.muyu.data.processing.domain.req.TestReq; -import com.muyu.data.processing.domain.resp.TestResp; -import com.muyu.data.processing.strategy.StrategyHandler; -import com.muyu.data.processing.strategy.branch.OneBranchStrategy; -import com.muyu.data.processing.strategy.branch.TwoBranchStrategy; -import com.muyu.data.processing.strategy.leaves.FourLeavesStrategy; -import com.muyu.data.processing.strategy.leaves.OneLeavesStrategy; -import com.muyu.data.processing.strategy.leaves.ThreeLeavesStrategy; -import com.muyu.data.processing.strategy.leaves.TwoLeavesStrategy; -import lombok.Getter; - -import java.util.Arrays; - -/** - * 策略选择枚举 - * - * @Author: 胡杨 - * @Name: StrategyEums - * @Description: 策略选择枚举 - * @CreatedDate: 2024/9/28 上午11:59 - * @FilePath: com.muyu.data.processing.domain - */ - -@Getter -public enum StrategyEums { - TEST1("加减法", new OneBranchStrategy()), - TEST2("乘除法", new TwoBranchStrategy()), - TEST1_1("加法", new OneLeavesStrategy()), - TEST1_2("减法", new TwoLeavesStrategy()), - TEST2_1("乘法", new ThreeLeavesStrategy()), - TEST2_2("除法", new FourLeavesStrategy()); - - private final String code; - private final StrategyHandler info; - - StrategyEums(String code, StrategyHandler info) { - this.code = code; - this.info = info; - } - - /** - * 鉴别参数是否是枚举的值 - * - * @param code 需鉴别参数 - * @return 如果存在返回结果turn, 否则返回false - */ - public static boolean isCode(String code) { - return Arrays.stream(values()) - .map(StrategyEums::getCode) - .anyMatch(c -> c.equals(code)); - } - - public static StrategyHandler getStrategy(String code) { - return Arrays.stream(values()) - .filter(c -> c.getCode().equals(code)) - .findFirst() - .orElseThrow(() -> new IllegalArgumentException("参数错误")) - .getInfo(); - } -} diff --git a/cloud-data-processing/src/main/java/com/muyu/data/processing/kafka/KafkaConsumerService.java b/cloud-data-processing/src/main/java/com/muyu/data/processing/kafka/KafkaConsumerService.java index 3fecf5a..9c44c6c 100644 --- a/cloud-data-processing/src/main/java/com/muyu/data/processing/kafka/KafkaConsumerService.java +++ b/cloud-data-processing/src/main/java/com/muyu/data/processing/kafka/KafkaConsumerService.java @@ -5,7 +5,8 @@ import cn.hutool.core.thread.ThreadUtil; import cn.hutool.json.JSONUtil; import com.alibaba.nacos.shaded.com.google.common.collect.Lists; import com.muyu.common.kafka.constants.KafkaConstants; -import com.muyu.data.processing.domain.IotDbData; +import com.muyu.data.processing.domain.KafkaData; +import com.muyu.data.processing.service.DataProcessingService; import jakarta.annotation.Resource; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.ConsumerRecord; @@ -16,8 +17,10 @@ import org.springframework.stereotype.Component; import java.time.Duration; import java.util.Collection; +import java.util.List; /** + * kafka消费者 * @Author: 胡杨 * @Name: KafkaConsumerService * @Description: kafka消费者 @@ -30,6 +33,8 @@ import java.util.Collection; public class KafkaConsumerService implements InitializingBean { @Resource private KafkaConsumer kafkaConsumer; + @Resource + private DataProcessingService service; @Override public void afterPropertiesSet() throws Exception { @@ -39,14 +44,16 @@ public class KafkaConsumerService implements InitializingBean { Collection topics = Lists.newArrayList(KafkaConstants.KafkaTopic); kafkaConsumer.subscribe(topics); while (true) { + System.out.println("开始消费数据,等待中..."); ConsumerRecords consumerRecords = kafkaConsumer.poll(Duration.ofMillis(1000)); for (ConsumerRecord consumerRecord : consumerRecords) { //1.从ConsumerRecord中获取消费数据 String originalMsg = (String) consumerRecord.value(); log.info("从Kafka中消费的原始数据: " + originalMsg); //2.把消费数据转换为DTO对象 - IotDbData iotDbData = JSONUtil.toBean(originalMsg, IotDbData.class); - log.info("消费数据转换为DTO对象: " + iotDbData.toString()); + List dataList = JSONUtil.toList(originalMsg, KafkaData.class); + log.info("从Kafka中消费的实体数据: " + dataList); + service.strategyCheck(dataList); } } }); diff --git a/cloud-data-processing/src/main/java/com/muyu/data/processing/mapper/DataProcessingMapper.java b/cloud-data-processing/src/main/java/com/muyu/data/processing/mapper/DataProcessingMapper.java index 4482b98..c14a1f0 100644 --- a/cloud-data-processing/src/main/java/com/muyu/data/processing/mapper/DataProcessingMapper.java +++ b/cloud-data-processing/src/main/java/com/muyu/data/processing/mapper/DataProcessingMapper.java @@ -1,7 +1,9 @@ package com.muyu.data.processing.mapper; import com.muyu.data.processing.domain.IotDbData; +import com.muyu.data.processing.domain.KafkaData; import org.apache.ibatis.annotations.Mapper; +import org.apache.ibatis.annotations.Param; import org.springframework.stereotype.Repository; import java.util.List; @@ -20,18 +22,9 @@ import java.util.List; @Mapper public interface DataProcessingMapper{ - Integer createCarData(IotDbData data); - - Integer updateCarData(IotDbData data); - - Integer deleteCarData(String timestamp); - - Integer createCarDataGroup(); - - Integer createCarDataGroupElement(); - -// List queryCarData(); - List selectStorageGroup(); + Integer insIotDbData(@Param("key") String key, @Param("value") String value); + + void strategyCheck(@Param("dataList") List dataList); } diff --git a/cloud-data-processing/src/main/java/com/muyu/data/processing/service/DataProcessingService.java b/cloud-data-processing/src/main/java/com/muyu/data/processing/service/DataProcessingService.java index 937bb17..591251b 100644 --- a/cloud-data-processing/src/main/java/com/muyu/data/processing/service/DataProcessingService.java +++ b/cloud-data-processing/src/main/java/com/muyu/data/processing/service/DataProcessingService.java @@ -2,6 +2,7 @@ package com.muyu.data.processing.service; import com.muyu.data.processing.domain.IotDbData; +import com.muyu.data.processing.domain.KafkaData; import java.util.List; @@ -16,43 +17,7 @@ import java.util.List; */ public interface DataProcessingService{ - /** - * 创建车辆报文记录 - * - * @param data 数据 - * @return {@link Integer } - */ - Integer createCarData(IotDbData data); - /** - * 更新车辆报文记录 - * - * @param data 数据 - * @return {@link Integer } - */ - Integer updateCarData(IotDbData data); - - /** - * 删除车辆报文记录 - * - * @param timestamp 时间戳 - * @return {@link Integer } - */ - Integer deleteCarData(String timestamp); - - /** - * 创建车辆报文记录组 - * - * @return {@link Integer } - */ - Integer createCarDataGroup(); - - /** - * 查询顺序 - * - * @return {@link List }<{@link IotDbData }> - */ - List queryCarData(); /** * 选择存储组 * @@ -60,4 +25,7 @@ public interface DataProcessingService{ */ List selectStorageGroup(); + void strategyCheck(List dataList); + + Integer insIotDbData(String key, String value); } diff --git a/cloud-data-processing/src/main/java/com/muyu/data/processing/service/impl/DataProcessingServiceImpl.java b/cloud-data-processing/src/main/java/com/muyu/data/processing/service/impl/DataProcessingServiceImpl.java index 7b6139a..886db94 100644 --- a/cloud-data-processing/src/main/java/com/muyu/data/processing/service/impl/DataProcessingServiceImpl.java +++ b/cloud-data-processing/src/main/java/com/muyu/data/processing/service/impl/DataProcessingServiceImpl.java @@ -3,12 +3,17 @@ package com.muyu.data.processing.service.impl; import javax.annotation.Resource; +import com.muyu.common.core.domain.Result; import com.muyu.data.processing.domain.IotDbData; +import com.muyu.data.processing.domain.KafkaData; +import com.muyu.data.processing.strategy.root.RootStrategy; import org.springframework.stereotype.Service; import lombok.extern.slf4j.Slf4j; import com.muyu.data.processing.mapper.DataProcessingMapper; import com.muyu.data.processing.service.DataProcessingService; +import java.util.ArrayList; +import java.util.HashMap; import java.util.List; /** @@ -26,45 +31,30 @@ import java.util.List; public class DataProcessingServiceImpl implements DataProcessingService { @Resource private DataProcessingMapper mapper; - - @Override - public Integer createCarData(IotDbData data) { - return mapper.createCarData(data); - } - - @Override - public Integer updateCarData(IotDbData data) { - return mapper.updateCarData(data); - } - - @Override - public Integer deleteCarData(String timestamp) { - return mapper.deleteCarData(timestamp); - } - - @Override - public Integer createCarDataGroup() { - try { - Integer flag = mapper.createCarDataGroup(); - Integer flagEle = mapper.createCarDataGroupElement(); - System.out.println("\n\t执行sql数量为{}" + flagEle); - return flagEle; - } catch (Exception e) { - e.printStackTrace(); - } - return 0; - } - - @Override - public List queryCarData() { -// return mapper.queryCarData(); - return null; - } + @Resource + private RootStrategy rootStrategy; @Override public List selectStorageGroup() { return mapper.selectStorageGroup(); } + @Override + public void strategyCheck(List dataList) { + HashMap kafkaDataHashMap = new HashMap<>(); + dataList.forEach(data -> kafkaDataHashMap.put(data.getKey(), data)); + Result result = rootStrategy.applyStrategy(kafkaDataHashMap); + String[] data = result.getData(); + insIotDbData(data[0],data[1]); + +// dataList.forEach(KafkaData::setValueClass); +// mapper.strategyCheck(dataList); + } + + @Override + public Integer insIotDbData(String key, String value) { + return mapper.insIotDbData(key, value); + } + } diff --git a/cloud-data-processing/src/main/java/com/muyu/data/processing/strategy/branch/OneBranchStrategy.java b/cloud-data-processing/src/main/java/com/muyu/data/processing/strategy/branch/OneBranchStrategy.java deleted file mode 100644 index aef6611..0000000 --- a/cloud-data-processing/src/main/java/com/muyu/data/processing/strategy/branch/OneBranchStrategy.java +++ /dev/null @@ -1,38 +0,0 @@ -package com.muyu.data.processing.strategy.branch; - -import com.muyu.data.processing.domain.StrategyEums; -import com.muyu.data.processing.domain.req.TestReq; -import com.muyu.data.processing.domain.resp.TestResp; -import com.muyu.data.processing.strategy.StrategyHandler; -import com.muyu.data.processing.strategy.abstractStrategyRouter; -import com.muyu.data.processing.strategy.leaves.FourLeavesStrategy; -import com.muyu.data.processing.strategy.leaves.OneLeavesStrategy; -import com.muyu.data.processing.strategy.leaves.ThreeLeavesStrategy; -import com.muyu.data.processing.strategy.leaves.TwoLeavesStrategy; -import lombok.extern.slf4j.Slf4j; -import org.springframework.stereotype.Component; - -/** - * 1号分支策略方法实现 - * - * @Author: 胡杨 - * @Name: OneBranchStrategy - * @Description: 1号叶子策略方法实现 - * @CreatedDate: 2024/9/28 上午11:50 - * @FilePath: com.muyu.data.processing.strategy.impl - */ - -@Slf4j -@Component -public class OneBranchStrategy extends abstractStrategyRouter implements StrategyHandler { - @Override - public TestResp apply(TestReq testReq) { - log.info("1号分支策略方法实现,参数1:{},参数2:{},执行方法:{}", testReq.getOne(), testReq.getTwo(), testReq.getType2()); - return applyStrategy(testReq); - } - - @Override - protected StrategyMapper registerStrategy() { - return param -> StrategyEums.getStrategy(param.getType2()); - } -} diff --git a/cloud-data-processing/src/main/java/com/muyu/data/processing/strategy/branch/TwoBranchStrategy.java b/cloud-data-processing/src/main/java/com/muyu/data/processing/strategy/branch/TwoBranchStrategy.java deleted file mode 100644 index eea73af..0000000 --- a/cloud-data-processing/src/main/java/com/muyu/data/processing/strategy/branch/TwoBranchStrategy.java +++ /dev/null @@ -1,38 +0,0 @@ -package com.muyu.data.processing.strategy.branch; - -import com.muyu.data.processing.domain.StrategyEums; -import com.muyu.data.processing.domain.req.TestReq; -import com.muyu.data.processing.domain.resp.TestResp; -import com.muyu.data.processing.strategy.StrategyHandler; -import com.muyu.data.processing.strategy.abstractStrategyRouter; -import com.muyu.data.processing.strategy.leaves.FourLeavesStrategy; -import com.muyu.data.processing.strategy.leaves.OneLeavesStrategy; -import com.muyu.data.processing.strategy.leaves.ThreeLeavesStrategy; -import com.muyu.data.processing.strategy.leaves.TwoLeavesStrategy; -import lombok.extern.slf4j.Slf4j; -import org.springframework.stereotype.Component; - -/** - * 2号分支策略方法实现 - * - * @Author: 胡杨 - * @Name: TwoBranchStrategy - * @Description: 1号叶子策略方法实现 - * @CreatedDate: 2024/9/28 上午11:50 - * @FilePath: com.muyu.data.processing.strategy.impl - */ - -@Slf4j -@Component -public class TwoBranchStrategy extends abstractStrategyRouter implements StrategyHandler { - @Override - public TestResp apply(TestReq testReq) { - log.info("2号分支策略方法实现,参数1:{},参数2:{},执行方法:{}", testReq.getOne(), testReq.getTwo(), testReq.getType2()); - return applyStrategy(testReq); - } - - @Override - protected StrategyMapper registerStrategy() { - return param -> StrategyEums.getStrategy(param.getType2()); - } -} diff --git a/cloud-data-processing/src/main/java/com/muyu/data/processing/strategy/leaves/FourLeavesStrategy.java b/cloud-data-processing/src/main/java/com/muyu/data/processing/strategy/leaves/FourLeavesStrategy.java deleted file mode 100644 index 001ec8a..0000000 --- a/cloud-data-processing/src/main/java/com/muyu/data/processing/strategy/leaves/FourLeavesStrategy.java +++ /dev/null @@ -1,27 +0,0 @@ -package com.muyu.data.processing.strategy.leaves; - -import com.muyu.data.processing.domain.req.TestReq; -import com.muyu.data.processing.domain.resp.TestResp; -import com.muyu.data.processing.strategy.StrategyHandler; -import lombok.extern.slf4j.Slf4j; -import org.springframework.stereotype.Component; - -/** - * 4号处理者 - * - * @Author: 胡杨 - * @Name: FourLeavesStrategy - * @Description: 4号处理者 - * @CreatedDate: 2024/9/28 上午11:54 - * @FilePath: com.muyu.data.processing.strategy.leaves - */ - -@Slf4j -@Component -public class FourLeavesStrategy implements StrategyHandler { - @Override - public TestResp apply(TestReq testReq) { - log.info("4号处理者实现,参数1:{},参数2:{},执行方法:{},结果:{}", testReq.getOne(), testReq.getTwo(), testReq.getType2(), (testReq.getOne()*1.0/testReq.getTwo())); - return new TestResp("执行4号处理者-除法"); - } -} diff --git a/cloud-data-processing/src/main/java/com/muyu/data/processing/strategy/leaves/InsIotDbStrategy.java b/cloud-data-processing/src/main/java/com/muyu/data/processing/strategy/leaves/InsIotDbStrategy.java new file mode 100644 index 0000000..5763eb6 --- /dev/null +++ b/cloud-data-processing/src/main/java/com/muyu/data/processing/strategy/leaves/InsIotDbStrategy.java @@ -0,0 +1,46 @@ +package com.muyu.data.processing.strategy.leaves; + +import com.muyu.common.core.domain.Result; +import com.muyu.common.core.enums.ClassType; +import com.muyu.data.processing.domain.KafkaData; +import com.muyu.data.processing.service.DataProcessingService; +import com.muyu.data.processing.service.impl.DataProcessingServiceImpl; +import com.muyu.data.processing.strategy.StrategyHandler; +import com.muyu.data.processing.utils.DataUtils; +import org.springframework.stereotype.Component; +import org.springframework.stereotype.Controller; + +import javax.annotation.Resource; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Set; + +/** + * 策略-时序数据库新增车辆信息 + * + * @Author: 胡杨 + * @Name: InsIotDbStrategy + * @Description: 新增时序数据库类 + * @CreatedDate: 2024/9/28 下午8:57 + * @FilePath: com.muyu.data.processing.strategy.leaves + */ +@Component +public class InsIotDbStrategy implements StrategyHandler, Result> { + + @Override + public Result apply(HashMap map) { + StringBuffer key = new StringBuffer("timestamp,"); + StringBuffer value = new StringBuffer(System.currentTimeMillis()+","); + ArrayList keys = new ArrayList<>(map.keySet()); + keys.forEach(k -> { + key.append(k).append(","); + value.append(DataUtils.convert(map.get(k).getValue(), ClassType.getInfo(map.get(k).getType()))).append(","); + }); +// return Result.success(insIotDbData(key.toString(),value.toString())); + return Result.success(new String[]{ + key.subSequence(0,key.length()-1).toString(), + value.subSequence(0,value.length()-1).toString() + }); + } +} diff --git a/cloud-data-processing/src/main/java/com/muyu/data/processing/strategy/leaves/OneLeavesStrategy.java b/cloud-data-processing/src/main/java/com/muyu/data/processing/strategy/leaves/OneLeavesStrategy.java deleted file mode 100644 index 9599332..0000000 --- a/cloud-data-processing/src/main/java/com/muyu/data/processing/strategy/leaves/OneLeavesStrategy.java +++ /dev/null @@ -1,31 +0,0 @@ -package com.muyu.data.processing.strategy.leaves; - -import com.muyu.data.processing.domain.req.TestReq; -import com.muyu.data.processing.domain.resp.TestResp; -import com.muyu.data.processing.strategy.StrategyHandler; -import com.muyu.data.processing.strategy.abstractStrategyRouter; -import com.muyu.data.processing.strategy.branch.OneBranchStrategy; -import com.muyu.data.processing.strategy.branch.TwoBranchStrategy; -import lombok.extern.slf4j.Slf4j; -import org.springframework.stereotype.Component; - -/** - * 1号处理者 - * - * @Author: 胡杨 - * @Name: OneLeavesStrategy - * @Description: 1号处理者 - * @CreatedDate: 2024/9/28 上午11:54 - * @FilePath: com.muyu.data.processing.strategy.leaves - */ - -@Slf4j -@Component -public class OneLeavesStrategy implements StrategyHandler { - @Override - public TestResp apply(TestReq testReq) { - log.info("1号处理者实现,参数1:{},参数2:{},执行方法:{},结果:{}", testReq.getOne(), testReq.getTwo(), testReq.getType2(), (testReq.getOne()+testReq.getTwo())); - return new TestResp("执行1号处理者-加法"); - } - -} diff --git a/cloud-data-processing/src/main/java/com/muyu/data/processing/strategy/leaves/ThreeLeavesStrategy.java b/cloud-data-processing/src/main/java/com/muyu/data/processing/strategy/leaves/ThreeLeavesStrategy.java deleted file mode 100644 index c28a57b..0000000 --- a/cloud-data-processing/src/main/java/com/muyu/data/processing/strategy/leaves/ThreeLeavesStrategy.java +++ /dev/null @@ -1,27 +0,0 @@ -package com.muyu.data.processing.strategy.leaves; - -import com.muyu.data.processing.domain.req.TestReq; -import com.muyu.data.processing.domain.resp.TestResp; -import com.muyu.data.processing.strategy.StrategyHandler; -import lombok.extern.slf4j.Slf4j; -import org.springframework.stereotype.Component; - -/** - * 3号处理者 - * - * @Author: 胡杨 - * @Name: ThreeLeavesStrategy - * @Description: 3号处理者 - * @CreatedDate: 2024/9/28 上午11:54 - * @FilePath: com.muyu.data.processing.strategy.leaves - */ - -@Slf4j -@Component -public class ThreeLeavesStrategy implements StrategyHandler { - @Override - public TestResp apply(TestReq testReq) { - log.info("3号处理者实现,参数1:{},参数2:{},执行方法:{},结果:{}", testReq.getOne(), testReq.getTwo(), testReq.getType2(), (testReq.getOne()*testReq.getTwo())); - return new TestResp("执行3号处理者-乘法"); - } -} diff --git a/cloud-data-processing/src/main/java/com/muyu/data/processing/strategy/leaves/TwoLeavesStrategy.java b/cloud-data-processing/src/main/java/com/muyu/data/processing/strategy/leaves/TwoLeavesStrategy.java deleted file mode 100644 index 1174ab7..0000000 --- a/cloud-data-processing/src/main/java/com/muyu/data/processing/strategy/leaves/TwoLeavesStrategy.java +++ /dev/null @@ -1,27 +0,0 @@ -package com.muyu.data.processing.strategy.leaves; - -import com.muyu.data.processing.domain.req.TestReq; -import com.muyu.data.processing.domain.resp.TestResp; -import com.muyu.data.processing.strategy.StrategyHandler; -import lombok.extern.slf4j.Slf4j; -import org.springframework.stereotype.Component; - -/** - * 2号处理者 - * - * @Author: 胡杨 - * @Name: TwoLeavesStrategy - * @Description: 2号处理者 - * @CreatedDate: 2024/9/28 上午11:54 - * @FilePath: com.muyu.data.processing.strategy.leaves - */ - -@Slf4j -@Component -public class TwoLeavesStrategy implements StrategyHandler { - @Override - public TestResp apply(TestReq testReq) { - log.info("2号处理者实现,参数1:{},参数2:{},执行方法:{},结果:{}", testReq.getOne(), testReq.getTwo(), testReq.getType2(), (testReq.getOne()-testReq.getTwo())); - return new TestResp("执行2号处理者-减法"); - } -} diff --git a/cloud-data-processing/src/main/java/com/muyu/data/processing/strategy/root/RootStrategy.java b/cloud-data-processing/src/main/java/com/muyu/data/processing/strategy/root/RootStrategy.java index 9df6ce2..55c39fc 100644 --- a/cloud-data-processing/src/main/java/com/muyu/data/processing/strategy/root/RootStrategy.java +++ b/cloud-data-processing/src/main/java/com/muyu/data/processing/strategy/root/RootStrategy.java @@ -1,20 +1,14 @@ package com.muyu.data.processing.strategy.root; -import com.muyu.common.core.utils.StringUtils; -import com.muyu.data.processing.domain.StrategyEums; -import com.muyu.data.processing.domain.req.TestReq; -import com.muyu.data.processing.domain.resp.TestResp; -import com.muyu.data.processing.strategy.StrategyHandler; +import com.muyu.common.core.domain.Result; +import com.muyu.data.processing.domain.KafkaData; import com.muyu.data.processing.strategy.abstractStrategyRouter; -import com.muyu.data.processing.strategy.branch.OneBranchStrategy; -import com.muyu.data.processing.strategy.branch.TwoBranchStrategy; -import com.muyu.data.processing.strategy.leaves.FourLeavesStrategy; -import com.muyu.data.processing.strategy.leaves.OneLeavesStrategy; -import com.muyu.data.processing.strategy.leaves.ThreeLeavesStrategy; -import com.muyu.data.processing.strategy.leaves.TwoLeavesStrategy; +import com.muyu.data.processing.strategy.leaves.InsIotDbStrategy; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; +import java.util.HashMap; + /** * 策略路由实现 * @@ -27,10 +21,10 @@ import org.springframework.stereotype.Component; @Slf4j @Component -public class RootStrategy extends abstractStrategyRouter { +public class RootStrategy extends abstractStrategyRouter, Result> { @Override - protected StrategyMapper registerStrategy() { - return param -> StrategyEums.getStrategy(param.getType1()); + protected StrategyMapper , Result> registerStrategy() { + return param -> new InsIotDbStrategy(); } } diff --git a/cloud-data-processing/src/main/java/com/muyu/data/processing/utils/DataUtils.java b/cloud-data-processing/src/main/java/com/muyu/data/processing/utils/DataUtils.java new file mode 100644 index 0000000..d65ee96 --- /dev/null +++ b/cloud-data-processing/src/main/java/com/muyu/data/processing/utils/DataUtils.java @@ -0,0 +1,24 @@ +package com.muyu.data.processing.utils; + +import org.springframework.stereotype.Component; + +/** + * 数据处理工具类 + * + * @Author: 胡杨 + * @Name: DataUtils + * @Description: 数据处理工具类 + * @CreatedDate: 2024/9/29 上午10:15 + * @FilePath: com.muyu.data.processing.utils + */ + +@Component +public class DataUtils { + public static T convert(Object data, Class type) { + if (type.isInstance(data)) { + return type.cast(data); + } else { + throw new IllegalArgumentException("数据 "+data+" 类型不匹配"); + } + } +} diff --git a/cloud-data-processing/src/main/resources/mapper/processing/DataProcessingMapper.xml b/cloud-data-processing/src/main/resources/mapper/processing/DataProcessingMapper.xml index fab3dd1..7a16e4b 100644 --- a/cloud-data-processing/src/main/resources/mapper/processing/DataProcessingMapper.xml +++ b/cloud-data-processing/src/main/resources/mapper/processing/DataProcessingMapper.xml @@ -2,25 +2,27 @@ - - insert into root.one.data(timestamp, CarData_id, CarData_num, CarData_name,create_time) values(#{timestamp},#{CarDataId},#{CarDataNum},#{CarDataName},#{createTime}); - - - delete from root.one.data where timestamp = ${timestamp}; - - - insert into root.one.data(timestamp, CarData_id, CarData_num, CarData_name,create_time) values(2021-11-24T18:28:20.689+08:00,#{CarDataId},#{CarDataNum},#{CarDataName},#{createTime}); + + insert into root.one.data(${key}) values(${value}); + + + insert into root.one.data + ( + + ${data.key} + + ) values + ( + + #{data.value} + + ) + - - SET STORAGE GROUP TO root.one.data - - - CREATE TIMESERIES root.one.data.CarData_num WITH DATATYPE=INT32, ENCODING=PLAIN, COMPRESSOR=SNAPPY; -