test: 基础测试

dev.data.processing
面包骑士 2024-10-04 15:13:03 +08:00
parent 854df0aab5
commit fcb6f81a12
13 changed files with 88 additions and 21 deletions

View File

@ -17,11 +17,11 @@ import java.util.List;
@Getter
public enum CacheNameEnums {
STORAGE("storage", "持久化"),
FAULT("fault", "故障"),
FENCE("fence", "围栏"),
STORAGE("storage", "持久化"),
REALTIME("realTime", "实时信息"),
WARMING("warming", "预警");
WARMING("warming", "预警"),
REALTIME("realTime", "实时信息");
private final String code;
private final String info;

View File

@ -1,6 +1,7 @@
package com.muyu.common.iotdb.config;
import org.apache.iotdb.rpc.IoTDBConnectionException;
import org.apache.iotdb.rpc.StatementExecutionException;
import org.apache.iotdb.session.Session;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
@ -34,13 +35,19 @@ public class IotDBConfig {
@Value("${spring.iotdb.fetchSize}")
private int fetchSize;
private static Session session;
@Bean
public Session iotSession(){
Session session = new Session(ip, port, user, password, fetchSize);
try {
session.open();
} catch (IoTDBConnectionException e) {
throw new RuntimeException(e);
if (session == null) {
session = new Session(ip, port, user, password, fetchSize);
try {
session.open();
session.setTimeZone("+08:00");
} catch (IoTDBConnectionException e) {
throw new RuntimeException(e);
} catch (StatementExecutionException e) {
throw new RuntimeException(e);
}
}
return session;
}

View File

@ -8,7 +8,7 @@ package com.muyu.common.kafka.constants;
*/
public class KafkaConstants {
public final static String KafkaTopic = "kafka_topic2";
public final static String KafkaTopic = "kafka_topic";
public final static String KafkaGrop = "kafka_grop2";
public final static String KafkaGrop = "kafka_grop";
}

View File

@ -1,6 +1,7 @@
package com.muyu.data.processing.controller;
import com.muyu.common.core.domain.Result;
import com.muyu.common.security.utils.SecurityUtils;
import com.muyu.data.processing.domain.IotDbData;
import com.muyu.data.processing.service.DataProcessingService;
@ -34,7 +35,7 @@ public class DataProcessingController {
*
* @return
*/
@RequestMapping(value = "/selectStorageGroup", method = RequestMethod.GET)
@GetMapping("/selectStorageGroup")
public Result selectStorageGroup() {
List<String> v = service.selectStorageGroup();if (v.size() > 0) {v.forEach(x -> {
System.out.println("group------------------" + x.toString());
@ -45,5 +46,12 @@ public class DataProcessingController {
}
}
@GetMapping("/selectCarData")
public Result selectCarData(@RequestParam("vin") String vin) {
// String firmCode = SecurityUtils.getSaasKey();
String firmCode = "firm01";
return Result.success(service.selectCarData(firmCode,vin));
}
}

View File

@ -43,7 +43,7 @@ public class TestController {
private CacheManager cacheManager;
@GetMapping("/testKafka")
public void sendMsg(@RequestParam("msg") String msg) {
public void sendMsg() {
try {
// 测试数据
String jsonString = """

View File

@ -0,0 +1,25 @@
package com.muyu.data.processing.domain;
import lombok.*;
/**
*
*
* @Author:
* @Name: CarData
* @Description:
* @CreatedDate: 2024/10/2 2:34
* @FilePath: com.muyu.data.processing.domain
*/
@Data
@Builder
@ToString
@NoArgsConstructor
@AllArgsConstructor
public class CarData {
private String vin;
private String timestamp;
private String latitude;
private String longitude;
}

View File

@ -1,5 +1,6 @@
package com.muyu.data.processing.mapper;
import com.muyu.data.processing.domain.CarData;
import com.muyu.data.processing.domain.IotDbData;
import com.muyu.data.processing.domain.BasicData;
import org.apache.ibatis.annotations.Mapper;
@ -29,4 +30,6 @@ public interface DataProcessingMapper{
void strategyCheck(@Param("dataList") List<BasicData> dataList);
Integer insIotDbDataVo(IotDbData build);
List<CarData> selectCarData(@Param("tableName") String tableName);
}

View File

@ -1,6 +1,7 @@
package com.muyu.data.processing.rebbit;
import com.muyu.common.caffeine.enums.CacheNameEnums;
import com.muyu.common.rabbit.constants.RabbitConstants;
import com.rabbitmq.client.Channel;
import jakarta.annotation.Resource;

View File

@ -1,6 +1,7 @@
package com.muyu.data.processing.rebbit;
import com.muyu.common.caffeine.enums.CacheNameEnums;
import com.muyu.common.rabbit.constants.RabbitConstants;
import com.rabbitmq.client.Channel;
import jakarta.annotation.Resource;
@ -34,9 +35,6 @@ public class GoOnlineRabbitConsumer {
@Resource
private CacheManager cacheManager;
@Value("#{'${cacheNames}'.split(',')}")
private List<String> cacheNames;
@RabbitListener(queuesToDeclare = {@Queue(RabbitConstants.GO_ONLINE_QUEUE)})
public void goOnline(String vin, Message message, Channel channel){
log.info("车辆 {} 上线, 配置信息准备中。。。",vin);
@ -66,7 +64,7 @@ public class GoOnlineRabbitConsumer {
*/
public void addCarCache(String vin) {
// 从Redis中获取缓存信息
for (String name : cacheNames) {
for (String name : CacheNameEnums.getCodes()) {
String value = redisTemplate.opsForValue().get(name+":"+vin);
cacheManager.getCache(name).put(vin, value);
log.info("存储缓存, 缓存分区:[{}], 车辆编码:[{}], 存储值:[{}]", name, vin, value);

View File

@ -2,6 +2,7 @@ package com.muyu.data.processing.service;
import com.muyu.data.processing.domain.BasicData;
import com.muyu.data.processing.domain.CarData;
import java.util.List;
@ -27,4 +28,6 @@ public interface DataProcessingService{
void strategyCheck(List<BasicData> dataList);
Integer insIotDbData(String key, String value);
List<CarData> selectCarData(String firmCode, String vin);
}

View File

@ -3,9 +3,13 @@ package com.muyu.data.processing.service.impl;
import javax.annotation.Resource;
import com.muyu.common.iotdb.config.IotDBConfig;
import com.muyu.data.processing.domain.CarData;
import com.muyu.data.processing.domain.IotDbData;
import com.muyu.data.processing.domain.BasicData;
import com.muyu.data.processing.strategy.core.StartStrategy;
import org.apache.iotdb.isession.SessionDataSet;import org.apache.iotdb.rpc.IoTDBConnectionException;
import org.apache.iotdb.rpc.StatementExecutionException;
import org.springframework.stereotype.Service;
import lombok.extern.slf4j.Slf4j;
import com.muyu.data.processing.mapper.DataProcessingMapper;
@ -30,7 +34,7 @@ public class DataProcessingServiceImpl implements DataProcessingService {
@Resource
private DataProcessingMapper mapper;
@Resource
private StartStrategy rootStrategy;
private IotDBConfig iotDBConfig;
@Override
@ -61,5 +65,20 @@ public class DataProcessingServiceImpl implements DataProcessingService {
return mapper.insIotDbData(key, value);
}
@Override
public List<CarData> selectCarData(String firmCode, String vin) {
String tableName = "root.one."+firmCode+"."+vin;
try {
SessionDataSet sessionDataSet = iotDBConfig.iotSession().executeQueryStatement(tableName);
List<String> columnNames = sessionDataSet.getColumnNames();
System.out.println(columnNames);
} catch (StatementExecutionException e) {
throw new RuntimeException(e);
} catch (IoTDBConnectionException e) {
throw new RuntimeException(e);
}
return null;
}
}

View File

@ -32,9 +32,9 @@ public class BasicStrategy extends abstractStrategyRouter<HashMap<String, BasicD
@Override
public Temporary2 apply(HashMap<String, BasicData> basicDataMap) {
log.info("开始执行基础校验节点。。。");
basicDataMap.put(CacheNameEnums.WARMING.getCode(), BasicData.builder().key("test").build());
basicDataMap.put(CacheNameEnums.FAULT.getCode(), BasicData.builder().key("test").build());
basicDataMap.put(CacheNameEnums.REALTIME.getCode(), BasicData.builder().key("test").build());
basicDataMap.put(CacheNameEnums.WARMING.getCode(), null);
basicDataMap.put(CacheNameEnums.FAULT.getCode(), null);
basicDataMap.put(CacheNameEnums.REALTIME.getCode(), null);
log.info("基础校验节点已通过。。。");
return applyStrategy(basicDataMap);
}

View File

@ -6,6 +6,10 @@
show storage group
</select>
<select id="selectCarData" resultType="com.muyu.data.processing.domain.CarData">
select * from ${tableName};
</select>
<insert id="insIotDbData">
insert into root.one.data(${key}) values(${value});
</insert>
@ -24,7 +28,6 @@
</insert>
<insert id="insIotDbDataVo">
insert into
root.one.data
(timestamp, vin, latitude,longitude)