完成kafka,iotdb测试
parent
2c1765b3a6
commit
b7351ee49e
|
@ -0,0 +1,63 @@
|
|||
package com.muyu.common.core.enums;
|
||||
|
||||
import lombok.Getter;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
||||
/**
|
||||
* 类型枚举
|
||||
*
|
||||
* @Author: 胡杨
|
||||
* @Name: ClassType
|
||||
* @Description: 类型枚举
|
||||
* @CreatedDate: 2024/9/29 上午9:28
|
||||
* @FilePath: com.muyu.common.core.enums
|
||||
*/
|
||||
|
||||
@Getter
|
||||
public enum ClassType {
|
||||
BYTE("byte", byte.class),
|
||||
SHORT("short", short.class),
|
||||
INT("int", int.class),
|
||||
LONG("long", long.class),
|
||||
FLOAT("float", float.class),
|
||||
DOUBLE("double", double.class),
|
||||
BOOLEAN("boolean", boolean.class),
|
||||
CHAR("char", char.class),
|
||||
STRING("String", String.class),
|
||||
SET("Set", Set.class),
|
||||
MAP("Map", Map.class),
|
||||
LIST("List", List.class);
|
||||
|
||||
private final String code;
|
||||
private final Class<?> info;
|
||||
|
||||
ClassType(String code, Class<?> info) {
|
||||
this.code = code;
|
||||
this.info = info;
|
||||
}
|
||||
|
||||
/**
|
||||
* 鉴别参数是否是枚举的值
|
||||
*
|
||||
* @param code 需鉴别参数
|
||||
* @return 如果存在返回结果turn, 否则返回false
|
||||
*/
|
||||
public static boolean isCode(String code) {
|
||||
return Arrays.stream(values())
|
||||
.map(ClassType::getCode)
|
||||
.anyMatch(c -> c.equals(code));
|
||||
}
|
||||
|
||||
|
||||
public static Class<?> getInfo(String code) {
|
||||
return Arrays.stream(values())
|
||||
.filter(c -> c.getCode().equals(code))
|
||||
.findFirst()
|
||||
.map(ClassType::getInfo)
|
||||
.orElse(null);
|
||||
}
|
||||
}
|
|
@ -8,7 +8,7 @@ package com.muyu.common.kafka.constants;
|
|||
*/
|
||||
public class KafkaConstants {
|
||||
|
||||
public final static String KafkaTopic = "kafka_topic";
|
||||
public final static String KafkaTopic = "kafka_topic2";
|
||||
|
||||
public final static String KafkaGrop = "kafka_grop";
|
||||
public final static String KafkaGrop = "kafka_grop2";
|
||||
}
|
||||
|
|
|
@ -4,11 +4,13 @@ import com.muyu.common.kafka.constants.KafkaConstants;
|
|||
|
||||
import com.muyu.common.security.annotation.EnableCustomConfig;
|
||||
import com.muyu.common.security.annotation.EnableMyFeignClients;
|
||||
import jakarta.annotation.PostConstruct;
|
||||
import org.springframework.boot.SpringApplication;
|
||||
import org.springframework.boot.autoconfigure.SpringBootApplication;
|
||||
import org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration;
|
||||
|
||||
/**
|
||||
* 数据处理模块启动器
|
||||
* @Author: 胡杨
|
||||
* @Name: MyData
|
||||
* @Description: 数据处理模块启动器
|
||||
|
|
|
@ -29,112 +29,6 @@ public class DataProcessingController {
|
|||
@Resource
|
||||
private DataProcessingService service;
|
||||
|
||||
|
||||
|
||||
@RequestMapping(value = "/createCarData", method = RequestMethod.POST)
|
||||
// @Transactional(rollbackFor = Exception.class)
|
||||
public Result createCarData(@RequestBody IotDbData data) {
|
||||
try {
|
||||
data.setTimestamp(System.currentTimeMillis());
|
||||
data.setCreateTime(new Date());
|
||||
Integer v = service.createCarData(data);
|
||||
if (v == -1) {
|
||||
return Result.success(v);
|
||||
} else {
|
||||
return Result.error(false);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
log.info("创建车辆报文记录失败!" + e);
|
||||
return Result.error(false);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* 更新操作 其实也是插入操作 时间戳相同 只和时间戳相关
|
||||
* @param data
|
||||
* @return
|
||||
*/
|
||||
@RequestMapping(value = "/updateCarData", method = RequestMethod.POST)
|
||||
public Result updateCarData(@RequestBody IotDbData data) {
|
||||
try {
|
||||
data.setTimestamp(System.currentTimeMillis());
|
||||
data.setCreateTime(new Date());
|
||||
Integer v = service.updateCarData(data);
|
||||
if (v == -1) {
|
||||
return Result.success(v);
|
||||
} else {
|
||||
return Result.error(false);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
log.info("更新车辆报文记录失败!" + e);
|
||||
return Result.error(false);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* 删除操作 要将时间戳的加号变成%2B
|
||||
* @param timestamp
|
||||
* @return
|
||||
*/
|
||||
@RequestMapping(value = "/deleteCarData", method = RequestMethod.GET)
|
||||
public Result deleteCarData(String timestamp) {
|
||||
try {
|
||||
Integer v = service.deleteCarData(timestamp);
|
||||
if (v == -1) {
|
||||
return Result.success(v);
|
||||
} else {
|
||||
return Result.error(false);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
log.info("删除车辆报文记录失败!" + e);
|
||||
return Result.error(false);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* 创建组 也就是相当于数据库
|
||||
* @return
|
||||
*/
|
||||
@RequestMapping(value = "/createCarDataGroup", method = RequestMethod.POST)
|
||||
public Result createCarDataGroup() {
|
||||
try {
|
||||
Integer v = service.createCarDataGroup();
|
||||
if (v > 0) {
|
||||
return Result.success(v);
|
||||
} else {
|
||||
return Result.error(false);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
log.info("创建车辆报文记录组失败!" + e);
|
||||
return Result.error(false);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 查询所有车辆报文记录数据
|
||||
* @return
|
||||
*/
|
||||
@RequestMapping(value = "/queryCarData", method = RequestMethod.GET)
|
||||
public Result queryCarData() {
|
||||
try {
|
||||
List<IotDbData> v = service.queryCarData();
|
||||
if (v.size() > 0) {
|
||||
v.forEach(x -> {
|
||||
System.out.println(x.toString());
|
||||
});
|
||||
return Result.success(v);
|
||||
} else {
|
||||
return Result.error(false);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
log.info("查询车辆报文记录组失败!" + e);
|
||||
return Result.error(false);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 查看数据库有多少组
|
||||
*
|
||||
|
@ -142,19 +36,14 @@ public class DataProcessingController {
|
|||
*/
|
||||
@RequestMapping(value = "/selectStorageGroup", method = RequestMethod.GET)
|
||||
public Result selectStorageGroup() {
|
||||
try {
|
||||
List<String> v = service.selectStorageGroup();
|
||||
if (v.size() > 0) {
|
||||
v.forEach(x -> {
|
||||
List<String> v = service.selectStorageGroup();if (v.size() > 0) {v.forEach(x -> {
|
||||
System.out.println("group------------------" + x.toString());
|
||||
});
|
||||
return Result.success(v);
|
||||
} else {
|
||||
return Result.error(false);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
log.info("查询组失败!" + e);
|
||||
return Result.error(false);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
|
|
@ -1,16 +1,13 @@
|
|||
package com.muyu.data.processing.controller;
|
||||
|
||||
|
||||
import com.alibaba.fastjson.JSONObject;
|
||||
import com.muyu.common.kafka.constants.KafkaConstants;
|
||||
import com.muyu.data.processing.domain.IotDbData;
|
||||
import com.muyu.data.processing.domain.req.TestReq;
|
||||
import com.muyu.data.processing.domain.resp.TestResp;
|
||||
import com.muyu.data.processing.strategy.root.RootStrategy;
|
||||
import jakarta.annotation.Resource;
|
||||
import org.apache.kafka.clients.producer.KafkaProducer;
|
||||
import org.apache.kafka.clients.producer.ProducerRecord;
|
||||
import org.springframework.transaction.annotation.Transactional;
|
||||
import org.springframework.web.bind.annotation.*;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
||||
|
@ -32,30 +29,44 @@ public class TestController {
|
|||
@GetMapping("/testKafka")
|
||||
public void sendMsg(@RequestParam("msg") String msg) {
|
||||
try {
|
||||
IotDbData iotDbData = IotDbData.builder()
|
||||
.timestamp(System.currentTimeMillis())
|
||||
.vin("vin666")
|
||||
.key("test")
|
||||
.label("测试数据")
|
||||
.value("Kafka测试")
|
||||
.type("String")
|
||||
.build();
|
||||
String jsonString = JSONObject.toJSONString(iotDbData);
|
||||
// 测试数据
|
||||
String jsonString = """
|
||||
[{
|
||||
"key": "vin",
|
||||
"label": "VIN码",
|
||||
"type": "String",
|
||||
"value": "vin131413534474"
|
||||
},{
|
||||
"key": "timestamp",
|
||||
"label": "时间戳",
|
||||
"type": "long",
|
||||
"value": 1727534036893L
|
||||
},{
|
||||
"key": "latitude",
|
||||
"label": "纬度",
|
||||
"type": "float",
|
||||
"value": 66.898F
|
||||
},{
|
||||
"key": "longitude",
|
||||
"label": "经度",
|
||||
"type": "float",
|
||||
"value": 99.124F
|
||||
}]""";
|
||||
ProducerRecord<String, String> producerRecord = new ProducerRecord<>(KafkaConstants.KafkaTopic, jsonString);
|
||||
kafkaProducer.send(producerRecord);
|
||||
System.out.println("同步消息发送成功: " + msg);
|
||||
System.out.println("同步消息发送成功: " + jsonString);
|
||||
} catch (Exception e) {
|
||||
e.printStackTrace();
|
||||
System.out.println("同步消息发送失败: " + msg);
|
||||
System.out.println("同步消息发送失败");
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@Resource
|
||||
private RootStrategy rootStrategy;
|
||||
|
||||
@PostMapping("/testStrategy")
|
||||
public TestResp testStrategy(@RequestBody TestReq testReq) {
|
||||
return rootStrategy.applyStrategy(testReq);
|
||||
}
|
||||
// @Resource
|
||||
// private RootStrategy rootStrategy;
|
||||
//
|
||||
// @PostMapping("/testStrategy")
|
||||
// public TestResp testStrategy(@RequestBody TestReq testReq) {
|
||||
// return rootStrategy.applyStrategy(testReq);
|
||||
// }
|
||||
}
|
||||
|
|
|
@ -0,0 +1,53 @@
|
|||
package com.muyu.data.processing.domain;
|
||||
|
||||
import com.muyu.common.core.enums.ClassType;
|
||||
import lombok.*;
|
||||
import org.apache.commons.lang3.ObjectUtils;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
|
||||
import java.io.Serializable;
|
||||
|
||||
/**
|
||||
* 报文信息 时序实体类
|
||||
*
|
||||
* @Author: 胡杨
|
||||
* @Name: DataProcessing
|
||||
* @Description: 报文信息 时序实体类
|
||||
* @CreatedDate: 2024/9/28 下午3:48
|
||||
* @FilePath: com.muyu.data.processing.domain
|
||||
*/
|
||||
|
||||
@Data
|
||||
@ToString
|
||||
@Builder
|
||||
@NoArgsConstructor
|
||||
@AllArgsConstructor
|
||||
public class KafkaData implements Serializable {
|
||||
|
||||
private String key;
|
||||
private String label;
|
||||
private Object value;
|
||||
private String type;
|
||||
|
||||
public void setType(String type) {
|
||||
this.type = type;
|
||||
if (StringUtils.isNotEmpty(this.type) && ObjectUtils.isNotEmpty(this.value)){
|
||||
setValueClass();
|
||||
}
|
||||
}
|
||||
|
||||
public void setValue(Object value) {
|
||||
this.value = value;
|
||||
if (StringUtils.isNotEmpty(this.type) && ObjectUtils.isNotEmpty(this.value)){
|
||||
setValueClass();
|
||||
}
|
||||
}
|
||||
|
||||
public void setValueClass() {
|
||||
Class<?> info = ClassType.getInfo(type);
|
||||
if (info.isInstance(value)){
|
||||
value = info.cast(value);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -1,62 +0,0 @@
|
|||
package com.muyu.data.processing.domain;
|
||||
|
||||
import com.muyu.data.processing.domain.req.TestReq;
|
||||
import com.muyu.data.processing.domain.resp.TestResp;
|
||||
import com.muyu.data.processing.strategy.StrategyHandler;
|
||||
import com.muyu.data.processing.strategy.branch.OneBranchStrategy;
|
||||
import com.muyu.data.processing.strategy.branch.TwoBranchStrategy;
|
||||
import com.muyu.data.processing.strategy.leaves.FourLeavesStrategy;
|
||||
import com.muyu.data.processing.strategy.leaves.OneLeavesStrategy;
|
||||
import com.muyu.data.processing.strategy.leaves.ThreeLeavesStrategy;
|
||||
import com.muyu.data.processing.strategy.leaves.TwoLeavesStrategy;
|
||||
import lombok.Getter;
|
||||
|
||||
import java.util.Arrays;
|
||||
|
||||
/**
|
||||
* 策略选择枚举
|
||||
*
|
||||
* @Author: 胡杨
|
||||
* @Name: StrategyEums
|
||||
* @Description: 策略选择枚举
|
||||
* @CreatedDate: 2024/9/28 上午11:59
|
||||
* @FilePath: com.muyu.data.processing.domain
|
||||
*/
|
||||
|
||||
@Getter
|
||||
public enum StrategyEums {
|
||||
TEST1("加减法", new OneBranchStrategy()),
|
||||
TEST2("乘除法", new TwoBranchStrategy()),
|
||||
TEST1_1("加法", new OneLeavesStrategy()),
|
||||
TEST1_2("减法", new TwoLeavesStrategy()),
|
||||
TEST2_1("乘法", new ThreeLeavesStrategy()),
|
||||
TEST2_2("除法", new FourLeavesStrategy());
|
||||
|
||||
private final String code;
|
||||
private final StrategyHandler<TestReq, TestResp> info;
|
||||
|
||||
StrategyEums(String code, StrategyHandler<TestReq, TestResp> info) {
|
||||
this.code = code;
|
||||
this.info = info;
|
||||
}
|
||||
|
||||
/**
|
||||
* 鉴别参数是否是枚举的值
|
||||
*
|
||||
* @param code 需鉴别参数
|
||||
* @return 如果存在返回结果turn, 否则返回false
|
||||
*/
|
||||
public static boolean isCode(String code) {
|
||||
return Arrays.stream(values())
|
||||
.map(StrategyEums::getCode)
|
||||
.anyMatch(c -> c.equals(code));
|
||||
}
|
||||
|
||||
public static StrategyHandler<TestReq, TestResp> getStrategy(String code) {
|
||||
return Arrays.stream(values())
|
||||
.filter(c -> c.getCode().equals(code))
|
||||
.findFirst()
|
||||
.orElseThrow(() -> new IllegalArgumentException("参数错误"))
|
||||
.getInfo();
|
||||
}
|
||||
}
|
|
@ -5,7 +5,8 @@ import cn.hutool.core.thread.ThreadUtil;
|
|||
import cn.hutool.json.JSONUtil;
|
||||
import com.alibaba.nacos.shaded.com.google.common.collect.Lists;
|
||||
import com.muyu.common.kafka.constants.KafkaConstants;
|
||||
import com.muyu.data.processing.domain.IotDbData;
|
||||
import com.muyu.data.processing.domain.KafkaData;
|
||||
import com.muyu.data.processing.service.DataProcessingService;
|
||||
import jakarta.annotation.Resource;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.kafka.clients.consumer.ConsumerRecord;
|
||||
|
@ -16,8 +17,10 @@ import org.springframework.stereotype.Component;
|
|||
|
||||
import java.time.Duration;
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* kafka消费者
|
||||
* @Author: 胡杨
|
||||
* @Name: KafkaConsumerService
|
||||
* @Description: kafka消费者
|
||||
|
@ -30,6 +33,8 @@ import java.util.Collection;
|
|||
public class KafkaConsumerService implements InitializingBean {
|
||||
@Resource
|
||||
private KafkaConsumer kafkaConsumer;
|
||||
@Resource
|
||||
private DataProcessingService service;
|
||||
|
||||
@Override
|
||||
public void afterPropertiesSet() throws Exception {
|
||||
|
@ -39,14 +44,16 @@ public class KafkaConsumerService implements InitializingBean {
|
|||
Collection<String> topics = Lists.newArrayList(KafkaConstants.KafkaTopic);
|
||||
kafkaConsumer.subscribe(topics);
|
||||
while (true) {
|
||||
System.out.println("开始消费数据,等待中...");
|
||||
ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofMillis(1000));
|
||||
for (ConsumerRecord consumerRecord : consumerRecords) {
|
||||
//1.从ConsumerRecord中获取消费数据
|
||||
String originalMsg = (String) consumerRecord.value();
|
||||
log.info("从Kafka中消费的原始数据: " + originalMsg);
|
||||
//2.把消费数据转换为DTO对象
|
||||
IotDbData iotDbData = JSONUtil.toBean(originalMsg, IotDbData.class);
|
||||
log.info("消费数据转换为DTO对象: " + iotDbData.toString());
|
||||
List<KafkaData> dataList = JSONUtil.toList(originalMsg, KafkaData.class);
|
||||
log.info("从Kafka中消费的实体数据: " + dataList);
|
||||
service.strategyCheck(dataList);
|
||||
}
|
||||
}
|
||||
});
|
||||
|
|
|
@ -1,7 +1,9 @@
|
|||
package com.muyu.data.processing.mapper;
|
||||
|
||||
import com.muyu.data.processing.domain.IotDbData;
|
||||
import com.muyu.data.processing.domain.KafkaData;
|
||||
import org.apache.ibatis.annotations.Mapper;
|
||||
import org.apache.ibatis.annotations.Param;
|
||||
import org.springframework.stereotype.Repository;
|
||||
|
||||
import java.util.List;
|
||||
|
@ -20,18 +22,9 @@ import java.util.List;
|
|||
@Mapper
|
||||
public interface DataProcessingMapper{
|
||||
|
||||
Integer createCarData(IotDbData data);
|
||||
|
||||
Integer updateCarData(IotDbData data);
|
||||
|
||||
Integer deleteCarData(String timestamp);
|
||||
|
||||
Integer createCarDataGroup();
|
||||
|
||||
Integer createCarDataGroupElement();
|
||||
|
||||
// List<DataProcessing> queryCarData();
|
||||
|
||||
List<String> selectStorageGroup();
|
||||
|
||||
Integer insIotDbData(@Param("key") String key, @Param("value") String value);
|
||||
|
||||
void strategyCheck(@Param("dataList") List<KafkaData> dataList);
|
||||
}
|
||||
|
|
|
@ -2,6 +2,7 @@ package com.muyu.data.processing.service;
|
|||
|
||||
|
||||
import com.muyu.data.processing.domain.IotDbData;
|
||||
import com.muyu.data.processing.domain.KafkaData;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
|
@ -16,43 +17,7 @@ import java.util.List;
|
|||
*/
|
||||
|
||||
public interface DataProcessingService{
|
||||
/**
|
||||
* 创建车辆报文记录
|
||||
*
|
||||
* @param data 数据
|
||||
* @return {@link Integer }
|
||||
*/
|
||||
Integer createCarData(IotDbData data);
|
||||
|
||||
/**
|
||||
* 更新车辆报文记录
|
||||
*
|
||||
* @param data 数据
|
||||
* @return {@link Integer }
|
||||
*/
|
||||
Integer updateCarData(IotDbData data);
|
||||
|
||||
/**
|
||||
* 删除车辆报文记录
|
||||
*
|
||||
* @param timestamp 时间戳
|
||||
* @return {@link Integer }
|
||||
*/
|
||||
Integer deleteCarData(String timestamp);
|
||||
|
||||
/**
|
||||
* 创建车辆报文记录组
|
||||
*
|
||||
* @return {@link Integer }
|
||||
*/
|
||||
Integer createCarDataGroup();
|
||||
|
||||
/**
|
||||
* 查询顺序
|
||||
*
|
||||
* @return {@link List }<{@link IotDbData }>
|
||||
*/
|
||||
List<IotDbData> queryCarData();
|
||||
/**
|
||||
* 选择存储组
|
||||
*
|
||||
|
@ -60,4 +25,7 @@ public interface DataProcessingService{
|
|||
*/
|
||||
List<String> selectStorageGroup();
|
||||
|
||||
void strategyCheck(List<KafkaData> dataList);
|
||||
|
||||
Integer insIotDbData(String key, String value);
|
||||
}
|
||||
|
|
|
@ -3,12 +3,17 @@ package com.muyu.data.processing.service.impl;
|
|||
|
||||
import javax.annotation.Resource;
|
||||
|
||||
import com.muyu.common.core.domain.Result;
|
||||
import com.muyu.data.processing.domain.IotDbData;
|
||||
import com.muyu.data.processing.domain.KafkaData;
|
||||
import com.muyu.data.processing.strategy.root.RootStrategy;
|
||||
import org.springframework.stereotype.Service;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import com.muyu.data.processing.mapper.DataProcessingMapper;
|
||||
import com.muyu.data.processing.service.DataProcessingService;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
|
@ -26,45 +31,30 @@ import java.util.List;
|
|||
public class DataProcessingServiceImpl implements DataProcessingService {
|
||||
@Resource
|
||||
private DataProcessingMapper mapper;
|
||||
|
||||
@Override
|
||||
public Integer createCarData(IotDbData data) {
|
||||
return mapper.createCarData(data);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Integer updateCarData(IotDbData data) {
|
||||
return mapper.updateCarData(data);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Integer deleteCarData(String timestamp) {
|
||||
return mapper.deleteCarData(timestamp);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Integer createCarDataGroup() {
|
||||
try {
|
||||
Integer flag = mapper.createCarDataGroup();
|
||||
Integer flagEle = mapper.createCarDataGroupElement();
|
||||
System.out.println("\n\t执行sql数量为{}" + flagEle);
|
||||
return flagEle;
|
||||
} catch (Exception e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<IotDbData> queryCarData() {
|
||||
// return mapper.queryCarData();
|
||||
return null;
|
||||
}
|
||||
@Resource
|
||||
private RootStrategy rootStrategy;
|
||||
|
||||
@Override
|
||||
public List<String> selectStorageGroup() {
|
||||
return mapper.selectStorageGroup();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void strategyCheck(List<KafkaData> dataList) {
|
||||
HashMap<String, KafkaData> kafkaDataHashMap = new HashMap<>();
|
||||
dataList.forEach(data -> kafkaDataHashMap.put(data.getKey(), data));
|
||||
Result<String[]> result = rootStrategy.applyStrategy(kafkaDataHashMap);
|
||||
String[] data = result.getData();
|
||||
insIotDbData(data[0],data[1]);
|
||||
|
||||
// dataList.forEach(KafkaData::setValueClass);
|
||||
// mapper.strategyCheck(dataList);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Integer insIotDbData(String key, String value) {
|
||||
return mapper.insIotDbData(key, value);
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
|
|
@ -1,38 +0,0 @@
|
|||
package com.muyu.data.processing.strategy.branch;
|
||||
|
||||
import com.muyu.data.processing.domain.StrategyEums;
|
||||
import com.muyu.data.processing.domain.req.TestReq;
|
||||
import com.muyu.data.processing.domain.resp.TestResp;
|
||||
import com.muyu.data.processing.strategy.StrategyHandler;
|
||||
import com.muyu.data.processing.strategy.abstractStrategyRouter;
|
||||
import com.muyu.data.processing.strategy.leaves.FourLeavesStrategy;
|
||||
import com.muyu.data.processing.strategy.leaves.OneLeavesStrategy;
|
||||
import com.muyu.data.processing.strategy.leaves.ThreeLeavesStrategy;
|
||||
import com.muyu.data.processing.strategy.leaves.TwoLeavesStrategy;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
/**
|
||||
* 1号分支策略方法实现
|
||||
*
|
||||
* @Author: 胡杨
|
||||
* @Name: OneBranchStrategy
|
||||
* @Description: 1号叶子策略方法实现
|
||||
* @CreatedDate: 2024/9/28 上午11:50
|
||||
* @FilePath: com.muyu.data.processing.strategy.impl
|
||||
*/
|
||||
|
||||
@Slf4j
|
||||
@Component
|
||||
public class OneBranchStrategy extends abstractStrategyRouter<TestReq, TestResp> implements StrategyHandler<TestReq,TestResp> {
|
||||
@Override
|
||||
public TestResp apply(TestReq testReq) {
|
||||
log.info("1号分支策略方法实现,参数1:{},参数2:{},执行方法:{}", testReq.getOne(), testReq.getTwo(), testReq.getType2());
|
||||
return applyStrategy(testReq);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected StrategyMapper<TestReq, TestResp> registerStrategy() {
|
||||
return param -> StrategyEums.getStrategy(param.getType2());
|
||||
}
|
||||
}
|
|
@ -1,38 +0,0 @@
|
|||
package com.muyu.data.processing.strategy.branch;
|
||||
|
||||
import com.muyu.data.processing.domain.StrategyEums;
|
||||
import com.muyu.data.processing.domain.req.TestReq;
|
||||
import com.muyu.data.processing.domain.resp.TestResp;
|
||||
import com.muyu.data.processing.strategy.StrategyHandler;
|
||||
import com.muyu.data.processing.strategy.abstractStrategyRouter;
|
||||
import com.muyu.data.processing.strategy.leaves.FourLeavesStrategy;
|
||||
import com.muyu.data.processing.strategy.leaves.OneLeavesStrategy;
|
||||
import com.muyu.data.processing.strategy.leaves.ThreeLeavesStrategy;
|
||||
import com.muyu.data.processing.strategy.leaves.TwoLeavesStrategy;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
/**
|
||||
* 2号分支策略方法实现
|
||||
*
|
||||
* @Author: 胡杨
|
||||
* @Name: TwoBranchStrategy
|
||||
* @Description: 1号叶子策略方法实现
|
||||
* @CreatedDate: 2024/9/28 上午11:50
|
||||
* @FilePath: com.muyu.data.processing.strategy.impl
|
||||
*/
|
||||
|
||||
@Slf4j
|
||||
@Component
|
||||
public class TwoBranchStrategy extends abstractStrategyRouter<TestReq, TestResp> implements StrategyHandler<TestReq,TestResp> {
|
||||
@Override
|
||||
public TestResp apply(TestReq testReq) {
|
||||
log.info("2号分支策略方法实现,参数1:{},参数2:{},执行方法:{}", testReq.getOne(), testReq.getTwo(), testReq.getType2());
|
||||
return applyStrategy(testReq);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected StrategyMapper<TestReq, TestResp> registerStrategy() {
|
||||
return param -> StrategyEums.getStrategy(param.getType2());
|
||||
}
|
||||
}
|
|
@ -1,27 +0,0 @@
|
|||
package com.muyu.data.processing.strategy.leaves;
|
||||
|
||||
import com.muyu.data.processing.domain.req.TestReq;
|
||||
import com.muyu.data.processing.domain.resp.TestResp;
|
||||
import com.muyu.data.processing.strategy.StrategyHandler;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
/**
|
||||
* 4号处理者
|
||||
*
|
||||
* @Author: 胡杨
|
||||
* @Name: FourLeavesStrategy
|
||||
* @Description: 4号处理者
|
||||
* @CreatedDate: 2024/9/28 上午11:54
|
||||
* @FilePath: com.muyu.data.processing.strategy.leaves
|
||||
*/
|
||||
|
||||
@Slf4j
|
||||
@Component
|
||||
public class FourLeavesStrategy implements StrategyHandler<TestReq, TestResp> {
|
||||
@Override
|
||||
public TestResp apply(TestReq testReq) {
|
||||
log.info("4号处理者实现,参数1:{},参数2:{},执行方法:{},结果:{}", testReq.getOne(), testReq.getTwo(), testReq.getType2(), (testReq.getOne()*1.0/testReq.getTwo()));
|
||||
return new TestResp("执行4号处理者-除法");
|
||||
}
|
||||
}
|
|
@ -0,0 +1,46 @@
|
|||
package com.muyu.data.processing.strategy.leaves;
|
||||
|
||||
import com.muyu.common.core.domain.Result;
|
||||
import com.muyu.common.core.enums.ClassType;
|
||||
import com.muyu.data.processing.domain.KafkaData;
|
||||
import com.muyu.data.processing.service.DataProcessingService;
|
||||
import com.muyu.data.processing.service.impl.DataProcessingServiceImpl;
|
||||
import com.muyu.data.processing.strategy.StrategyHandler;
|
||||
import com.muyu.data.processing.utils.DataUtils;
|
||||
import org.springframework.stereotype.Component;
|
||||
import org.springframework.stereotype.Controller;
|
||||
|
||||
import javax.annotation.Resource;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
|
||||
/**
|
||||
* 策略-时序数据库新增车辆信息
|
||||
*
|
||||
* @Author: 胡杨
|
||||
* @Name: InsIotDbStrategy
|
||||
* @Description: 新增时序数据库类
|
||||
* @CreatedDate: 2024/9/28 下午8:57
|
||||
* @FilePath: com.muyu.data.processing.strategy.leaves
|
||||
*/
|
||||
@Component
|
||||
public class InsIotDbStrategy implements StrategyHandler<HashMap<String, KafkaData>, Result<String[]>> {
|
||||
|
||||
@Override
|
||||
public Result<String[]> apply(HashMap<String, KafkaData> map) {
|
||||
StringBuffer key = new StringBuffer("timestamp,");
|
||||
StringBuffer value = new StringBuffer(System.currentTimeMillis()+",");
|
||||
ArrayList<String> keys = new ArrayList<>(map.keySet());
|
||||
keys.forEach(k -> {
|
||||
key.append(k).append(",");
|
||||
value.append(DataUtils.convert(map.get(k).getValue(), ClassType.getInfo(map.get(k).getType()))).append(",");
|
||||
});
|
||||
// return Result.success(insIotDbData(key.toString(),value.toString()));
|
||||
return Result.success(new String[]{
|
||||
key.subSequence(0,key.length()-1).toString(),
|
||||
value.subSequence(0,value.length()-1).toString()
|
||||
});
|
||||
}
|
||||
}
|
|
@ -1,31 +0,0 @@
|
|||
package com.muyu.data.processing.strategy.leaves;
|
||||
|
||||
import com.muyu.data.processing.domain.req.TestReq;
|
||||
import com.muyu.data.processing.domain.resp.TestResp;
|
||||
import com.muyu.data.processing.strategy.StrategyHandler;
|
||||
import com.muyu.data.processing.strategy.abstractStrategyRouter;
|
||||
import com.muyu.data.processing.strategy.branch.OneBranchStrategy;
|
||||
import com.muyu.data.processing.strategy.branch.TwoBranchStrategy;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
/**
|
||||
* 1号处理者
|
||||
*
|
||||
* @Author: 胡杨
|
||||
* @Name: OneLeavesStrategy
|
||||
* @Description: 1号处理者
|
||||
* @CreatedDate: 2024/9/28 上午11:54
|
||||
* @FilePath: com.muyu.data.processing.strategy.leaves
|
||||
*/
|
||||
|
||||
@Slf4j
|
||||
@Component
|
||||
public class OneLeavesStrategy implements StrategyHandler<TestReq, TestResp> {
|
||||
@Override
|
||||
public TestResp apply(TestReq testReq) {
|
||||
log.info("1号处理者实现,参数1:{},参数2:{},执行方法:{},结果:{}", testReq.getOne(), testReq.getTwo(), testReq.getType2(), (testReq.getOne()+testReq.getTwo()));
|
||||
return new TestResp("执行1号处理者-加法");
|
||||
}
|
||||
|
||||
}
|
|
@ -1,27 +0,0 @@
|
|||
package com.muyu.data.processing.strategy.leaves;
|
||||
|
||||
import com.muyu.data.processing.domain.req.TestReq;
|
||||
import com.muyu.data.processing.domain.resp.TestResp;
|
||||
import com.muyu.data.processing.strategy.StrategyHandler;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
/**
|
||||
* 3号处理者
|
||||
*
|
||||
* @Author: 胡杨
|
||||
* @Name: ThreeLeavesStrategy
|
||||
* @Description: 3号处理者
|
||||
* @CreatedDate: 2024/9/28 上午11:54
|
||||
* @FilePath: com.muyu.data.processing.strategy.leaves
|
||||
*/
|
||||
|
||||
@Slf4j
|
||||
@Component
|
||||
public class ThreeLeavesStrategy implements StrategyHandler<TestReq, TestResp> {
|
||||
@Override
|
||||
public TestResp apply(TestReq testReq) {
|
||||
log.info("3号处理者实现,参数1:{},参数2:{},执行方法:{},结果:{}", testReq.getOne(), testReq.getTwo(), testReq.getType2(), (testReq.getOne()*testReq.getTwo()));
|
||||
return new TestResp("执行3号处理者-乘法");
|
||||
}
|
||||
}
|
|
@ -1,27 +0,0 @@
|
|||
package com.muyu.data.processing.strategy.leaves;
|
||||
|
||||
import com.muyu.data.processing.domain.req.TestReq;
|
||||
import com.muyu.data.processing.domain.resp.TestResp;
|
||||
import com.muyu.data.processing.strategy.StrategyHandler;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
/**
|
||||
* 2号处理者
|
||||
*
|
||||
* @Author: 胡杨
|
||||
* @Name: TwoLeavesStrategy
|
||||
* @Description: 2号处理者
|
||||
* @CreatedDate: 2024/9/28 上午11:54
|
||||
* @FilePath: com.muyu.data.processing.strategy.leaves
|
||||
*/
|
||||
|
||||
@Slf4j
|
||||
@Component
|
||||
public class TwoLeavesStrategy implements StrategyHandler<TestReq, TestResp> {
|
||||
@Override
|
||||
public TestResp apply(TestReq testReq) {
|
||||
log.info("2号处理者实现,参数1:{},参数2:{},执行方法:{},结果:{}", testReq.getOne(), testReq.getTwo(), testReq.getType2(), (testReq.getOne()-testReq.getTwo()));
|
||||
return new TestResp("执行2号处理者-减法");
|
||||
}
|
||||
}
|
|
@ -1,20 +1,14 @@
|
|||
package com.muyu.data.processing.strategy.root;
|
||||
|
||||
import com.muyu.common.core.utils.StringUtils;
|
||||
import com.muyu.data.processing.domain.StrategyEums;
|
||||
import com.muyu.data.processing.domain.req.TestReq;
|
||||
import com.muyu.data.processing.domain.resp.TestResp;
|
||||
import com.muyu.data.processing.strategy.StrategyHandler;
|
||||
import com.muyu.common.core.domain.Result;
|
||||
import com.muyu.data.processing.domain.KafkaData;
|
||||
import com.muyu.data.processing.strategy.abstractStrategyRouter;
|
||||
import com.muyu.data.processing.strategy.branch.OneBranchStrategy;
|
||||
import com.muyu.data.processing.strategy.branch.TwoBranchStrategy;
|
||||
import com.muyu.data.processing.strategy.leaves.FourLeavesStrategy;
|
||||
import com.muyu.data.processing.strategy.leaves.OneLeavesStrategy;
|
||||
import com.muyu.data.processing.strategy.leaves.ThreeLeavesStrategy;
|
||||
import com.muyu.data.processing.strategy.leaves.TwoLeavesStrategy;
|
||||
import com.muyu.data.processing.strategy.leaves.InsIotDbStrategy;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import java.util.HashMap;
|
||||
|
||||
/**
|
||||
* 策略路由实现
|
||||
*
|
||||
|
@ -27,10 +21,10 @@ import org.springframework.stereotype.Component;
|
|||
|
||||
@Slf4j
|
||||
@Component
|
||||
public class RootStrategy extends abstractStrategyRouter<TestReq, TestResp> {
|
||||
public class RootStrategy extends abstractStrategyRouter<HashMap<String, KafkaData>, Result<String[]>> {
|
||||
@Override
|
||||
protected StrategyMapper<TestReq , TestResp> registerStrategy() {
|
||||
return param -> StrategyEums.getStrategy(param.getType1());
|
||||
protected StrategyMapper<HashMap<String, KafkaData> , Result<String[]>> registerStrategy() {
|
||||
return param -> new InsIotDbStrategy();
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -0,0 +1,24 @@
|
|||
package com.muyu.data.processing.utils;
|
||||
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
/**
|
||||
* 数据处理工具类
|
||||
*
|
||||
* @Author: 胡杨
|
||||
* @Name: DataUtils
|
||||
* @Description: 数据处理工具类
|
||||
* @CreatedDate: 2024/9/29 上午10:15
|
||||
* @FilePath: com.muyu.data.processing.utils
|
||||
*/
|
||||
|
||||
@Component
|
||||
public class DataUtils {
|
||||
public static <T> T convert(Object data, Class<T> type) {
|
||||
if (type.isInstance(data)) {
|
||||
return type.cast(data);
|
||||
} else {
|
||||
throw new IllegalArgumentException("数据 "+data+" 类型不匹配");
|
||||
}
|
||||
}
|
||||
}
|
|
@ -2,25 +2,27 @@
|
|||
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
|
||||
<mapper namespace="com.muyu.data.processing.mapper.DataProcessingMapper">
|
||||
|
||||
<insert id="createCarData" parameterType="com.muyu.data.processing.domain.IotDbData">
|
||||
insert into root.one.data(timestamp, CarData_id, CarData_num, CarData_name,create_time) values(#{timestamp},#{CarDataId},#{CarDataNum},#{CarDataName},#{createTime});
|
||||
</insert>
|
||||
<select id="selectStorageGroup" resultType="java.lang.String">
|
||||
show storage group
|
||||
</select>
|
||||
<delete id="deleteCarData" parameterType="java.lang.String">
|
||||
delete from root.one.data where timestamp = ${timestamp};
|
||||
</delete>
|
||||
|
||||
<insert id="updateCarData">
|
||||
insert into root.one.data(timestamp, CarData_id, CarData_num, CarData_name,create_time) values(2021-11-24T18:28:20.689+08:00,#{CarDataId},#{CarDataNum},#{CarDataName},#{createTime});
|
||||
<insert id="insIotDbData">
|
||||
insert into root.one.data(${key}) values(${value});
|
||||
</insert>
|
||||
<insert id="strategyCheck">
|
||||
insert into root.one.data
|
||||
(
|
||||
<foreach collection="dataList" item="data" separator=",">
|
||||
${data.key}
|
||||
</foreach>
|
||||
) values
|
||||
(
|
||||
<foreach collection="dataList" item="data" separator=",">
|
||||
#{data.value}
|
||||
</foreach>
|
||||
)
|
||||
|
||||
</insert>
|
||||
|
||||
<update id="createCarDataGroup">
|
||||
SET STORAGE GROUP TO root.one.data
|
||||
</update>
|
||||
<update id="createCarDataGroupElement">
|
||||
CREATE TIMESERIES root.one.data.CarData_num WITH DATATYPE=INT32, ENCODING=PLAIN, COMPRESSOR=SNAPPY;
|
||||
</update>
|
||||
|
||||
</mapper>
|
||||
|
|
Loading…
Reference in New Issue