diff --git a/cloud-common/cloud-common-rabbit/src/main/java/com/muyu/common/rabbit/config/ConfirmCallbackConfig.java b/cloud-common/cloud-common-rabbit/src/main/java/com/muyu/common/rabbit/config/ConfirmCallbackConfig.java new file mode 100644 index 0000000..da94c0b --- /dev/null +++ b/cloud-common/cloud-common-rabbit/src/main/java/com/muyu/common/rabbit/config/ConfirmCallbackConfig.java @@ -0,0 +1,48 @@ +package com.muyu.common.rabbit.config; + +import lombok.extern.log4j.Log4j2; +import org.springframework.amqp.rabbit.connection.CorrelationData; +import org.springframework.amqp.rabbit.core.RabbitTemplate; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +import javax.annotation.PostConstruct; + +/** + * 消息发送确认配置 + * 消息发送到交换机的回调 + */ +@Component +@Log4j2 +public class ConfirmCallbackConfig implements RabbitTemplate.ConfirmCallback { + + @Autowired + private RabbitTemplate rabbitTemplate; + + /** + * @PostContruct是spring框架的注解,在⽅法上加该注解会在项⽬启动的时候执⾏该⽅法,也可以理解为在spring容器初始化的时候执 + */ + @PostConstruct + public void init() { + rabbitTemplate.setConfirmCallback(this); + } + + /** + * 交换机不管是否收到消息的一个回调方法 + * + * @param correlationData 消息相关数据 + * @param ack 交换机是否收到消息 + * @param cause 失败原因 + */ + @Override + public void confirm(CorrelationData correlationData, boolean ack, String cause) { + if (!ack) { + String exchange = correlationData.getReturned().getExchange(); + String message = correlationData.getReturned().getMessage().getBody().toString(); + // 发送异常 + log.error("消息:{},发送到交换机:{}失败,原因是:{}", message, exchange, cause); + // TODO 可以把异常信息 以及 消息的内容直接添加到 MYSQL + } + } + +} diff --git a/cloud-common/cloud-common-rabbit/src/main/java/com/muyu/common/rabbit/RabbitListenerConfigurer.java b/cloud-common/cloud-common-rabbit/src/main/java/com/muyu/common/rabbit/config/RabbitListenerConfigurer.java similarity index 97% rename from cloud-common/cloud-common-rabbit/src/main/java/com/muyu/common/rabbit/RabbitListenerConfigurer.java rename to cloud-common/cloud-common-rabbit/src/main/java/com/muyu/common/rabbit/config/RabbitListenerConfigurer.java index af09bfd..ba95bbb 100644 --- a/cloud-common/cloud-common-rabbit/src/main/java/com/muyu/common/rabbit/RabbitListenerConfigurer.java +++ b/cloud-common/cloud-common-rabbit/src/main/java/com/muyu/common/rabbit/config/RabbitListenerConfigurer.java @@ -1,4 +1,4 @@ -package com.muyu.common.rabbit; +package com.muyu.common.rabbit.config; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.listener.RabbitListenerEndpointRegistrar; diff --git a/cloud-common/cloud-common-rabbit/src/main/java/com/muyu/common/rabbit/config/ReturnCallbackConfig.java b/cloud-common/cloud-common-rabbit/src/main/java/com/muyu/common/rabbit/config/ReturnCallbackConfig.java new file mode 100644 index 0000000..a457eb0 --- /dev/null +++ b/cloud-common/cloud-common-rabbit/src/main/java/com/muyu/common/rabbit/config/ReturnCallbackConfig.java @@ -0,0 +1,41 @@ +package com.muyu.common.rabbit.config; + +import lombok.extern.log4j.Log4j2; +import org.springframework.amqp.core.ReturnedMessage; +import org.springframework.amqp.rabbit.core.RabbitTemplate; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +import javax.annotation.PostConstruct; + +/** + * 消息发送到队列的确认 + */ +@Component +@Log4j2 +public class ReturnCallbackConfig implements RabbitTemplate.ReturnsCallback { + + @Autowired + private RabbitTemplate rabbitTemplate; + + /** + * @PostContruct是spring框架的注解,在⽅法上加该注解会在项⽬启动的时候执⾏该⽅法,也可以理解为在spring容器初始化的时候执 + */ + @PostConstruct + public void init() { + rabbitTemplate.setReturnsCallback(this); + } + + /** + * 消息发送失败 则会执行这个方法 + * + * @param returnedMessage the returned message and metadata. + */ + @Override + public void returnedMessage(ReturnedMessage returnedMessage) { + log.error("消息:{},被交换机:{} 回退!退回原因为:{}", + returnedMessage.getMessage().toString(), returnedMessage.getExchange(), returnedMessage.getReplyText()); + // TODO 回退了所有的信息,可做补偿机制 + } + +} diff --git a/cloud-common/cloud-common-rabbit/src/main/java/com/muyu/common/rabbit/constants/RabbitConstants.java b/cloud-common/cloud-common-rabbit/src/main/java/com/muyu/common/rabbit/constants/RabbitConstants.java new file mode 100644 index 0000000..70c7620 --- /dev/null +++ b/cloud-common/cloud-common-rabbit/src/main/java/com/muyu/common/rabbit/constants/RabbitConstants.java @@ -0,0 +1,15 @@ +package com.muyu.common.rabbit.constants; + +/** + * rabbit常量 + * @Author: 胡杨 + * @date: 2024/7/10 + * @Description: rabbit常量 + * @Version 1.0.0 + */ +public class RabbitConstants { + + public final static String GO_ONLINE_QUEUE= "GoOnline"; + + public final static String DOWNLINE_QUEUE= "Downline"; +} diff --git a/cloud-common/cloud-common-rabbit/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports b/cloud-common/cloud-common-rabbit/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports index 189ea2c..3c60088 100644 --- a/cloud-common/cloud-common-rabbit/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports +++ b/cloud-common/cloud-common-rabbit/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports @@ -1 +1 @@ -com.muyu.common.rabbit.RabbitListenerConfigurer \ No newline at end of file +#com.muyu.common.rabbit.config.RabbitListenerConfigurer diff --git a/cloud-data-processing/pom.xml b/cloud-data-processing/pom.xml index 0f006a2..af13f3f 100644 --- a/cloud-data-processing/pom.xml +++ b/cloud-data-processing/pom.xml @@ -81,7 +81,7 @@ com.alibaba druid-spring-boot-starter - 1.1.9 + 1.2.20 @@ -96,19 +96,25 @@ 3.5.5 - - - - + + + - - - - - - + + + com.alibaba + druid-spring-boot-3-starter + ${druid.version} + + + + + com.baomidou + dynamic-datasource-spring-boot3-starter + ${dynamic-ds.version} + diff --git a/cloud-data-processing/src/main/java/com/muyu/data/processing/MyDataApplication.java b/cloud-data-processing/src/main/java/com/muyu/data/processing/MyDataApplication.java index a527785..bb658b0 100644 --- a/cloud-data-processing/src/main/java/com/muyu/data/processing/MyDataApplication.java +++ b/cloud-data-processing/src/main/java/com/muyu/data/processing/MyDataApplication.java @@ -5,6 +5,7 @@ import com.muyu.common.kafka.constants.KafkaConstants; import com.muyu.common.security.annotation.EnableCustomConfig; import com.muyu.common.security.annotation.EnableMyFeignClients; import jakarta.annotation.PostConstruct; +import org.springframework.amqp.rabbit.annotation.EnableRabbit; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration; @@ -17,7 +18,7 @@ import org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration; * @CreatedDate: 2024/9/26 下午7:31 * @FilePath: com.muyu.data.processing */ - +@EnableRabbit @EnableMyFeignClients @SpringBootApplication public class MyDataApplication { diff --git a/cloud-data-processing/src/main/java/com/muyu/data/processing/config/IotDBConfig.java b/cloud-data-processing/src/main/java/com/muyu/data/processing/config/IotDBConfig.java new file mode 100644 index 0000000..ea8719b --- /dev/null +++ b/cloud-data-processing/src/main/java/com/muyu/data/processing/config/IotDBConfig.java @@ -0,0 +1,48 @@ +//package com.muyu.data.processing.config; +// +//import org.apache.iotdb.rpc.IoTDBConnectionException; +//import org.apache.iotdb.session.Session; +//import org.springframework.beans.factory.annotation.Value; +//import org.springframework.context.annotation.Bean; +//import org.springframework.context.annotation.Configuration; +// +///** +// * 时序数据库配置 +// * +// * @Author: 胡杨 +// * @Name: IotDBConfig +// * @Description: 时序数据库配置 +// * @CreatedDate: 2024/9/29 下午9:30 +// * @FilePath: com.muyu.data.processing.config +// */ +// +//@Configuration +//public class IotDBConfig { +// +// @Value("${spring.iotdb.ip}") +// private String ip; +// +// @Value("${spring.iotdb.port}") +// private int port; +// +// @Value("${spring.iotdb.user}") +// private String user; +// +// @Value("${spring.iotdb.password}") +// private String password; +// +// @Value("${spring.iotdb.fetchSize}") +// private int fetchSize; +// +// @Bean +// public Session iotSession(){ +// Session session = new Session(ip, port, user, password, fetchSize); +// try { +// session.open(); +// } catch (IoTDBConnectionException e) { +// throw new RuntimeException(e); +// } +// return session; +// } +// +//} diff --git a/cloud-data-processing/src/main/java/com/muyu/data/processing/controller/TestController.java b/cloud-data-processing/src/main/java/com/muyu/data/processing/controller/TestController.java index cffd65d..5a7a03c 100644 --- a/cloud-data-processing/src/main/java/com/muyu/data/processing/controller/TestController.java +++ b/cloud-data-processing/src/main/java/com/muyu/data/processing/controller/TestController.java @@ -1,13 +1,13 @@ package com.muyu.data.processing.controller; +import com.muyu.common.core.utils.uuid.UUID; import com.muyu.common.kafka.constants.KafkaConstants; -import com.muyu.data.processing.domain.req.TestReq; -import com.muyu.data.processing.domain.resp.TestResp; -import com.muyu.data.processing.strategy.root.RootStrategy; +import com.muyu.common.rabbit.constants.RabbitConstants; import jakarta.annotation.Resource; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; +import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.web.bind.annotation.*; import lombok.extern.slf4j.Slf4j; @@ -25,6 +25,10 @@ import lombok.extern.slf4j.Slf4j; public class TestController { @Resource private KafkaProducer kafkaProducer; + @Resource + private RabbitTemplate rabbitTemplate; +// @Resource +// private IotDBConfig iotDBConfig; @GetMapping("/testKafka") public void sendMsg(@RequestParam("msg") String msg) { @@ -39,18 +43,18 @@ public class TestController { },{ "key": "timestamp", "label": "时间戳", - "type": "long", - "value": 1727534036893L + "type": "String", + "value": "1727534036893" },{ "key": "latitude", "label": "纬度", - "type": "float", - "value": 66.898F + "type": "String", + "value": "66.898" },{ "key": "longitude", "label": "经度", - "type": "float", - "value": 99.124F + "type": "String", + "value": "99.12" }]"""; ProducerRecord producerRecord = new ProducerRecord<>(KafkaConstants.KafkaTopic, jsonString); kafkaProducer.send(producerRecord); @@ -61,12 +65,25 @@ public class TestController { } } + @GetMapping("/testRabbit/GoOnline") + public void testRabbitGoOnline(@RequestParam("msg") String msg) { + rabbitTemplate.convertAndSend(RabbitConstants.GO_ONLINE_QUEUE, msg, message -> { + message.getMessageProperties().setMessageId(UUID.randomUUID().toString().replace("-","")); + return message; + }); + } -// @Resource -// private RootStrategy rootStrategy; -// -// @PostMapping("/testStrategy") -// public TestResp testStrategy(@RequestBody TestReq testReq) { -// return rootStrategy.applyStrategy(testReq); + @GetMapping("/testRabbit/Downline") + public void testRabbitDownline(@RequestParam("msg") String msg) { + rabbitTemplate.convertAndSend(RabbitConstants.DOWNLINE_QUEUE, msg, message -> { + message.getMessageProperties().setMessageId(UUID.randomUUID().toString().replace("-","")); + return message; + }); + } + +// @GetMapping("/insertData") +// public void insertData(String deviceId, long time, double value) throws Exception { +// String sql = String.format("insert into root.one.%s(timestamp, temperature) values (%d, %f)", deviceId, time, value); +// iotDBConfig.iotSession().executeNonQueryStatement(sql); // } } diff --git a/cloud-data-processing/src/main/java/com/muyu/data/processing/domain/IotDbData.java b/cloud-data-processing/src/main/java/com/muyu/data/processing/domain/IotDbData.java index 249e952..7353356 100644 --- a/cloud-data-processing/src/main/java/com/muyu/data/processing/domain/IotDbData.java +++ b/cloud-data-processing/src/main/java/com/muyu/data/processing/domain/IotDbData.java @@ -27,10 +27,8 @@ public class IotDbData extends BaseEntity { private String vin; - private String key; - private String label; - private String value; - private String type; + private String latitude; + private String longitude; } diff --git a/cloud-data-processing/src/main/java/com/muyu/data/processing/domain/KafkaData.java b/cloud-data-processing/src/main/java/com/muyu/data/processing/domain/KafkaData.java index d2eb581..0f1b542 100644 --- a/cloud-data-processing/src/main/java/com/muyu/data/processing/domain/KafkaData.java +++ b/cloud-data-processing/src/main/java/com/muyu/data/processing/domain/KafkaData.java @@ -26,28 +26,14 @@ public class KafkaData implements Serializable { private String key; private String label; - private Object value; + private String value; private String type; - public void setType(String type) { - this.type = type; - if (StringUtils.isNotEmpty(this.type) && ObjectUtils.isNotEmpty(this.value)){ - setValueClass(); - } - } - - public void setValue(Object value) { - this.value = value; - if (StringUtils.isNotEmpty(this.type) && ObjectUtils.isNotEmpty(this.value)){ - setValueClass(); - } - } - - public void setValueClass() { - Class info = ClassType.getInfo(type); - if (info.isInstance(value)){ - value = info.cast(value); - } - } +// public void setValueClass() { +// Class info = ClassType.getInfo(type); +// if (info.isInstance(value)){ +// value = info.cast(value); +// } +// } } diff --git a/cloud-data-processing/src/main/java/com/muyu/data/processing/mapper/DataProcessingMapper.java b/cloud-data-processing/src/main/java/com/muyu/data/processing/mapper/DataProcessingMapper.java index c14a1f0..dfb4e79 100644 --- a/cloud-data-processing/src/main/java/com/muyu/data/processing/mapper/DataProcessingMapper.java +++ b/cloud-data-processing/src/main/java/com/muyu/data/processing/mapper/DataProcessingMapper.java @@ -27,4 +27,6 @@ public interface DataProcessingMapper{ Integer insIotDbData(@Param("key") String key, @Param("value") String value); void strategyCheck(@Param("dataList") List dataList); + + Integer insIotDbDataVo(IotDbData build); } diff --git a/cloud-data-processing/src/main/java/com/muyu/data/processing/rebbit/DownlineRabbit.java b/cloud-data-processing/src/main/java/com/muyu/data/processing/rebbit/DownlineRabbitConsumer.java similarity index 86% rename from cloud-data-processing/src/main/java/com/muyu/data/processing/rebbit/DownlineRabbit.java rename to cloud-data-processing/src/main/java/com/muyu/data/processing/rebbit/DownlineRabbitConsumer.java index b679a91..07397ad 100644 --- a/cloud-data-processing/src/main/java/com/muyu/data/processing/rebbit/DownlineRabbit.java +++ b/cloud-data-processing/src/main/java/com/muyu/data/processing/rebbit/DownlineRabbitConsumer.java @@ -2,8 +2,8 @@ package com.muyu.data.processing.rebbit; import com.muyu.common.caffeine.CaffeineCacheUtils; +import com.muyu.common.rabbit.constants.RabbitConstants; import com.rabbitmq.client.Channel; -import jakarta.annotation.Resource; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.Queue; @@ -14,26 +14,27 @@ import java.io.IOException; import java.util.HashSet; /** + * 下线事件监听 * @Author: 胡杨 - * @Name: DownlineRabbit + * @Name: DownlineRabbitConsumer * @Description: 车辆下线监听器 * @CreatedDate: 2024/9/26 下午8:21 * @FilePath: com.muyu.data.processing.rebbit */ @Slf4j @Component -public class DownlineRabbit { +public class DownlineRabbitConsumer { private CaffeineCacheUtils caffeineCacheUtils = new CaffeineCacheUtils(); private static final HashSet DOWNLINE_SET = new HashSet<>(); - @RabbitListener(queuesToDeclare = {@Queue("Downline")}) + @RabbitListener(queuesToDeclare = {@Queue(RabbitConstants.DOWNLINE_QUEUE)}) public void downline(String vin, Message message, Channel channel) { log.info("车辆 {} 下线, 配置信息准备中。。。",vin); try { // 重复性校验 if (DOWNLINE_SET.add(message.getMessageProperties().getMessageId())) { - caffeineCacheUtils.deleteCarCache(vin); +// caffeineCacheUtils.deleteCarCache(vin); log.info("车辆 {} 下线, 消息已确认。。。",vin); } else { log.info("车辆 {} 下线, 消息重复消费,已确认。。。",vin); diff --git a/cloud-data-processing/src/main/java/com/muyu/data/processing/rebbit/GoOnlineRabbit.java b/cloud-data-processing/src/main/java/com/muyu/data/processing/rebbit/GoOnlineRabbitConsumer.java similarity index 86% rename from cloud-data-processing/src/main/java/com/muyu/data/processing/rebbit/GoOnlineRabbit.java rename to cloud-data-processing/src/main/java/com/muyu/data/processing/rebbit/GoOnlineRabbitConsumer.java index 9bfa927..e69bae4 100644 --- a/cloud-data-processing/src/main/java/com/muyu/data/processing/rebbit/GoOnlineRabbit.java +++ b/cloud-data-processing/src/main/java/com/muyu/data/processing/rebbit/GoOnlineRabbitConsumer.java @@ -2,6 +2,7 @@ package com.muyu.data.processing.rebbit; import com.muyu.common.caffeine.CaffeineCacheUtils; +import com.muyu.common.rabbit.constants.RabbitConstants; import com.rabbitmq.client.Channel; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.Message; @@ -9,32 +10,32 @@ import org.springframework.amqp.rabbit.annotation.Queue; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; -import javax.annotation.Resource; import java.io.IOException; import java.util.HashSet; /** + * 上线时间监听 * @Author: 胡杨 - * @Name: GoOnlineRabbit + * @Name: GoOnlineRabbitConsumer * @Description: 上线事件 * @CreatedDate: 2024/9/26 下午7:38 * @FilePath: com.muyu.data.processing.rebbit */ @Slf4j @Component -public class GoOnlineRabbit { +public class GoOnlineRabbitConsumer { private CaffeineCacheUtils caffeineCacheUtils = new CaffeineCacheUtils(); private static final HashSet DATA_SET = new HashSet<>(); - @RabbitListener(queuesToDeclare = {@Queue("GoOnline")}) + @RabbitListener(queuesToDeclare = {@Queue(RabbitConstants.GO_ONLINE_QUEUE)}) public void goOnline(String vin, Message message, Channel channel){ log.info("车辆 {} 上线, 配置信息准备中。。。",vin); try { // 重复性校验 if (DATA_SET.add(message.getMessageProperties().getMessageId())) { - caffeineCacheUtils.addCarCache(vin); +// caffeineCacheUtils.addCarCache(vin); log.info("车辆 {} 上线, 消息已确认。。。",vin); } else { log.info("车辆 {} 上线, 消息重复消费,已确认。。。",vin); diff --git a/cloud-data-processing/src/main/java/com/muyu/data/processing/service/impl/DataProcessingServiceImpl.java b/cloud-data-processing/src/main/java/com/muyu/data/processing/service/impl/DataProcessingServiceImpl.java index 886db94..bb4cd8b 100644 --- a/cloud-data-processing/src/main/java/com/muyu/data/processing/service/impl/DataProcessingServiceImpl.java +++ b/cloud-data-processing/src/main/java/com/muyu/data/processing/service/impl/DataProcessingServiceImpl.java @@ -3,7 +3,6 @@ package com.muyu.data.processing.service.impl; import javax.annotation.Resource; -import com.muyu.common.core.domain.Result; import com.muyu.data.processing.domain.IotDbData; import com.muyu.data.processing.domain.KafkaData; import com.muyu.data.processing.strategy.root.RootStrategy; @@ -12,7 +11,6 @@ import lombok.extern.slf4j.Slf4j; import com.muyu.data.processing.mapper.DataProcessingMapper; import com.muyu.data.processing.service.DataProcessingService; -import java.util.ArrayList; import java.util.HashMap; import java.util.List; @@ -34,6 +32,7 @@ public class DataProcessingServiceImpl implements DataProcessingService { @Resource private RootStrategy rootStrategy; + @Override public List selectStorageGroup() { return mapper.selectStorageGroup(); @@ -43,10 +42,16 @@ public class DataProcessingServiceImpl implements DataProcessingService { public void strategyCheck(List dataList) { HashMap kafkaDataHashMap = new HashMap<>(); dataList.forEach(data -> kafkaDataHashMap.put(data.getKey(), data)); - Result result = rootStrategy.applyStrategy(kafkaDataHashMap); - String[] data = result.getData(); - insIotDbData(data[0],data[1]); - +// Result result = rootStrategy.applyStrategy(kafkaDataHashMap); +// String[] data = result.getData(); +// insIotDbData(data[0],data[1]); + IotDbData build = IotDbData.builder() + .vin(kafkaDataHashMap.get("vin").getValue()) + .timestamp(Long.parseLong(kafkaDataHashMap.get("timestamp").getValue())) + .latitude(kafkaDataHashMap.get("latitude").getValue()) + .longitude(kafkaDataHashMap.get("longitude").getValue()) + .build(); + mapper.insIotDbDataVo(build); // dataList.forEach(KafkaData::setValueClass); // mapper.strategyCheck(dataList); } diff --git a/cloud-data-processing/src/main/java/com/muyu/data/processing/strategy/leaves/InsIotDbStrategy.java b/cloud-data-processing/src/main/java/com/muyu/data/processing/strategy/leaves/InsIotDbStrategy.java index 5763eb6..e86f059 100644 --- a/cloud-data-processing/src/main/java/com/muyu/data/processing/strategy/leaves/InsIotDbStrategy.java +++ b/cloud-data-processing/src/main/java/com/muyu/data/processing/strategy/leaves/InsIotDbStrategy.java @@ -17,7 +17,7 @@ import java.util.List; import java.util.Set; /** - * 策略-时序数据库新增车辆信息 + * 策略-时序新增-车辆信息 * * @Author: 胡杨 * @Name: InsIotDbStrategy diff --git a/cloud-data-processing/src/main/java/com/muyu/data/processing/utils/DataUtils.java b/cloud-data-processing/src/main/java/com/muyu/data/processing/utils/DataUtils.java index d65ee96..5b9e90f 100644 --- a/cloud-data-processing/src/main/java/com/muyu/data/processing/utils/DataUtils.java +++ b/cloud-data-processing/src/main/java/com/muyu/data/processing/utils/DataUtils.java @@ -15,10 +15,6 @@ import org.springframework.stereotype.Component; @Component public class DataUtils { public static T convert(Object data, Class type) { - if (type.isInstance(data)) { - return type.cast(data); - } else { - throw new IllegalArgumentException("数据 "+data+" 类型不匹配"); - } + return type.cast(data); } } diff --git a/cloud-data-processing/src/main/resources/bootstrap.yml b/cloud-data-processing/src/main/resources/bootstrap.yml index a1de498..337453e 100644 --- a/cloud-data-processing/src/main/resources/bootstrap.yml +++ b/cloud-data-processing/src/main/resources/bootstrap.yml @@ -11,22 +11,13 @@ nacos: # SPRING_AMQP_DESERIALIZATION_TRUST_ALL=true spring.amqp.deserialization.trust.all # Spring spring: - datasource: - username: root - password: root - driver-class-name: org.apache.iotdb.jdbc.IoTDBDriver - url: jdbc:iotdb://47.116.173.119:6667/ - initial-size: 5 - min-idle: 10 - max-active: 20 - max-wait: 60000 - remove-abandoned: true - remove-abandoned-timeout: 30 - time-between-eviction-runs-millis: 60000 - min-evictable-idle-time-millis: 300000 - test-while-idle: false - test-on-borrow: false - test-on-return: false +# iotdb: +# ip: 47.116.173.119 +# port: 6667 +# user: root +# password: root +# fetchSize: 10000 +# maxActive: 10 amqp: deserialization: trust: @@ -77,4 +68,3 @@ spring: logging: level: com.muyu.system.mapper: DEBUG - diff --git a/cloud-data-processing/src/main/resources/mapper/processing/DataProcessingMapper.xml b/cloud-data-processing/src/main/resources/mapper/processing/DataProcessingMapper.xml index 7a16e4b..2373a13 100644 --- a/cloud-data-processing/src/main/resources/mapper/processing/DataProcessingMapper.xml +++ b/cloud-data-processing/src/main/resources/mapper/processing/DataProcessingMapper.xml @@ -23,6 +23,13 @@ ) + + + insert into + root.one.data + (timestamp, vin, latitude,longitude) + values (#{timestamp}, #{vin}, #{latitude}, #{longitude}) +