diff --git a/cloud-common/cloud-common-caffeine/src/main/java/com/muyu/common/caffeine/bean/CaffeineManager.java b/cloud-common/cloud-common-caffeine/src/main/java/com/muyu/common/caffeine/bean/CaffeineManager.java index df33d6b..8ccc11f 100644 --- a/cloud-common/cloud-common-caffeine/src/main/java/com/muyu/common/caffeine/bean/CaffeineManager.java +++ b/cloud-common/cloud-common-caffeine/src/main/java/com/muyu/common/caffeine/bean/CaffeineManager.java @@ -34,7 +34,7 @@ public class CaffeineManager { * @return 缓存管理器实例 */ @Bean - public CacheManager cacheManager() { + public SimpleCacheManager simpleCacheManager() { SimpleCacheManager cacheManager = new SimpleCacheManager(); List cacheNames = CacheNameEnums.getCodes(); cacheManager.setCaches(cacheNames.stream() diff --git a/cloud-data-processing/src/main/java/com/muyu/data/processing/controller/TestController.java b/cloud-data-processing/src/main/java/com/muyu/data/processing/controller/TestController.java index b4aad1f..24bf4d9 100644 --- a/cloud-data-processing/src/main/java/com/muyu/data/processing/controller/TestController.java +++ b/cloud-data-processing/src/main/java/com/muyu/data/processing/controller/TestController.java @@ -1,6 +1,7 @@ package com.muyu.data.processing.controller; +import com.github.benmanes.caffeine.cache.Caffeine; import com.muyu.common.caffeine.enums.CacheNameEnums; import com.muyu.common.core.utils.uuid.UUID; import com.muyu.common.iotdb.config.IotDBConfig; @@ -12,10 +13,14 @@ import org.apache.kafka.clients.producer.ProducerRecord; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.cache.Cache; import org.springframework.cache.CacheManager; +import org.springframework.cache.caffeine.CaffeineCache; +import org.springframework.cache.support.SimpleCacheManager; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.web.bind.annotation.*; import lombok.extern.slf4j.Slf4j; +import java.util.ArrayList; + /** * 测试控制层 * @Author: 胡杨 @@ -40,7 +45,7 @@ public class TestController { // private CaffeineCacheUtils cacheUtils; @Resource - private CacheManager cacheManager; + private SimpleCacheManager cacheManager; @GetMapping("/testKafka") public void sendMsg() { @@ -143,4 +148,19 @@ public class TestController { log.info("无缓存"); } } + + + @GetMapping("/testAddCache") + public void testAddCache(@RequestParam("vin") String vin) { + ArrayList caches = new ArrayList<>(); + caches.add(new CaffeineCache(vin, Caffeine.newBuilder().recordStats().build())); + cacheManager.setCaches(caches); + log.info("缓存管理器创建新分区: {}", vin); + } + + @GetMapping("/testGetCacheNames") + public void testGetCacheNames() { + cacheManager.initializeCaches(); + log.info("缓存分区列表: {}", cacheManager.getCacheNames()); + } } diff --git a/cloud-data-processing/src/main/java/com/muyu/data/processing/domain/CarData.java b/cloud-data-processing/src/main/java/com/muyu/data/processing/domain/CarData.java index bc275a0..c2fdfb9 100644 --- a/cloud-data-processing/src/main/java/com/muyu/data/processing/domain/CarData.java +++ b/cloud-data-processing/src/main/java/com/muyu/data/processing/domain/CarData.java @@ -19,7 +19,7 @@ import lombok.*; @AllArgsConstructor public class CarData { private String vin; - private String timestamp; + private long timestamp; private String latitude; private String longitude; } diff --git a/cloud-data-processing/src/main/java/com/muyu/data/processing/service/impl/DataProcessingServiceImpl.java b/cloud-data-processing/src/main/java/com/muyu/data/processing/service/impl/DataProcessingServiceImpl.java index 0f9da8e..5f3b23c 100644 --- a/cloud-data-processing/src/main/java/com/muyu/data/processing/service/impl/DataProcessingServiceImpl.java +++ b/cloud-data-processing/src/main/java/com/muyu/data/processing/service/impl/DataProcessingServiceImpl.java @@ -10,11 +10,14 @@ import com.muyu.data.processing.domain.BasicData; import com.muyu.data.processing.strategy.core.StartStrategy; import org.apache.iotdb.isession.SessionDataSet;import org.apache.iotdb.rpc.IoTDBConnectionException; import org.apache.iotdb.rpc.StatementExecutionException; +import org.apache.iotdb.tsfile.read.common.Field; +import org.apache.iotdb.tsfile.read.common.RowRecord; import org.springframework.stereotype.Service; import lombok.extern.slf4j.Slf4j; import com.muyu.data.processing.mapper.DataProcessingMapper; import com.muyu.data.processing.service.DataProcessingService; +import java.util.ArrayList; import java.util.HashMap; import java.util.List; @@ -67,17 +70,35 @@ public class DataProcessingServiceImpl implements DataProcessingService { @Override public List selectCarData(String firmCode, String vin) { - String tableName = "root.one."+firmCode+"."+vin; + ArrayList carDataList = new ArrayList<>(); + String sql = "select * from root.one."+firmCode+"."+vin; try { - SessionDataSet sessionDataSet = iotDBConfig.iotSession().executeQueryStatement(tableName); + SessionDataSet sessionDataSet = iotDBConfig.iotSession().executeQueryStatement(sql); List columnNames = sessionDataSet.getColumnNames(); - System.out.println(columnNames); - } catch (StatementExecutionException e) { - throw new RuntimeException(e); - } catch (IoTDBConnectionException e) { + while (sessionDataSet.hasNext()){ + RowRecord next = sessionDataSet.next(); + CarData data = getCarData(vin, next, columnNames); + carDataList.add(data); + } + } catch (Exception e) { throw new RuntimeException(e); } - return null; + return carDataList; + } + + private static CarData getCarData(String vin, RowRecord next, List columnNames) { + List fields = next.getFields(); + CarData data = new CarData(); + data.setVin(vin); + data.setTimestamp(next.getTimestamp()); + for (int i = 0; i < columnNames.size(); i++) { + if (columnNames.get(i).contains("latitude")) { + data.setLatitude(fields.get(i-1).getStringValue()); + }else if (columnNames.get(i).contains("longitude")) { + data.setLongitude(fields.get(i-1).getStringValue()); + } + } + return data; }