dev.data.processing.dataTreating
面包骑士 2024-09-29 22:38:44 +08:00
parent b7351ee49e
commit 53311e2f98
19 changed files with 255 additions and 93 deletions

View File

@ -0,0 +1,48 @@
package com.muyu.common.rabbit.config;
import lombok.extern.log4j.Log4j2;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
/**
*
*
*/
@Component
@Log4j2
public class ConfirmCallbackConfig implements RabbitTemplate.ConfirmCallback {
@Autowired
private RabbitTemplate rabbitTemplate;
/**
* @PostContructspringspring
*/
@PostConstruct
public void init() {
rabbitTemplate.setConfirmCallback(this);
}
/**
*
*
* @param correlationData
* @param ack
* @param cause
*/
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
if (!ack) {
String exchange = correlationData.getReturned().getExchange();
String message = correlationData.getReturned().getMessage().getBody().toString();
// 发送异常
log.error("消息:{},发送到交换机:{}失败,原因是:{}", message, exchange, cause);
// TODO 可以把异常信息 以及 消息的内容直接添加到 MYSQL
}
}
}

View File

@ -1,4 +1,4 @@
package com.muyu.common.rabbit; package com.muyu.common.rabbit.config;
import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.listener.RabbitListenerEndpointRegistrar; import org.springframework.amqp.rabbit.listener.RabbitListenerEndpointRegistrar;

View File

@ -0,0 +1,41 @@
package com.muyu.common.rabbit.config;
import lombok.extern.log4j.Log4j2;
import org.springframework.amqp.core.ReturnedMessage;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
/**
*
*/
@Component
@Log4j2
public class ReturnCallbackConfig implements RabbitTemplate.ReturnsCallback {
@Autowired
private RabbitTemplate rabbitTemplate;
/**
* @PostContructspringspring
*/
@PostConstruct
public void init() {
rabbitTemplate.setReturnsCallback(this);
}
/**
*
*
* @param returnedMessage the returned message and metadata.
*/
@Override
public void returnedMessage(ReturnedMessage returnedMessage) {
log.error("消息:{},被交换机:{} 回退!退回原因为:{}",
returnedMessage.getMessage().toString(), returnedMessage.getExchange(), returnedMessage.getReplyText());
// TODO 回退了所有的信息,可做补偿机制
}
}

View File

@ -0,0 +1,15 @@
package com.muyu.common.rabbit.constants;
/**
* rabbit
* @Author:
* @date: 2024/7/10
* @Description: rabbit
* @Version 1.0.0
*/
public class RabbitConstants {
public final static String GO_ONLINE_QUEUE= "GoOnline";
public final static String DOWNLINE_QUEUE= "Downline";
}

View File

@ -1 +1 @@
com.muyu.common.rabbit.RabbitListenerConfigurer #com.muyu.common.rabbit.config.RabbitListenerConfigurer

View File

@ -81,7 +81,7 @@
<dependency> <dependency>
<groupId>com.alibaba</groupId> <groupId>com.alibaba</groupId>
<artifactId>druid-spring-boot-starter</artifactId> <artifactId>druid-spring-boot-starter</artifactId>
<version>1.1.9</version> <version>1.2.20</version>
</dependency> </dependency>
@ -96,19 +96,25 @@
<version>3.5.5</version> <version>3.5.5</version>
</dependency> </dependency>
<!-- &lt;!&ndash; Druid &ndash;&gt;-->
<!-- <dependency>--> <!-- <dependency>-->
<!-- <groupId>com.alibaba</groupId>--> <!-- <groupId>org.apache.iotdb</groupId>-->
<!-- <artifactId>druid-spring-boot-3-starter</artifactId>--> <!-- <artifactId>iotdb-session</artifactId>-->
<!-- <version>${druid.version}</version>--> <!-- <version>1.3.2</version>-->
<!-- </dependency>--> <!-- </dependency>-->
<!-- &lt;!&ndash; Dynamic DataSource &ndash;&gt;--> <!-- Druid -->
<!-- <dependency>--> <dependency>
<!-- <groupId>com.baomidou</groupId>--> <groupId>com.alibaba</groupId>
<!-- <artifactId>dynamic-datasource-spring-boot3-starter</artifactId>--> <artifactId>druid-spring-boot-3-starter</artifactId>
<!-- <version>${dynamic-ds.version}</version>--> <version>${druid.version}</version>
<!-- </dependency>--> </dependency>
<!-- Dynamic DataSource -->
<dependency>
<groupId>com.baomidou</groupId>
<artifactId>dynamic-datasource-spring-boot3-starter</artifactId>
<version>${dynamic-ds.version}</version>
</dependency>
</dependencies> </dependencies>

View File

@ -5,6 +5,7 @@ import com.muyu.common.kafka.constants.KafkaConstants;
import com.muyu.common.security.annotation.EnableCustomConfig; import com.muyu.common.security.annotation.EnableCustomConfig;
import com.muyu.common.security.annotation.EnableMyFeignClients; import com.muyu.common.security.annotation.EnableMyFeignClients;
import jakarta.annotation.PostConstruct; import jakarta.annotation.PostConstruct;
import org.springframework.amqp.rabbit.annotation.EnableRabbit;
import org.springframework.boot.SpringApplication; import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration; import org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration;
@ -17,7 +18,7 @@ import org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration;
* @CreatedDate: 2024/9/26 7:31 * @CreatedDate: 2024/9/26 7:31
* @FilePath: com.muyu.data.processing * @FilePath: com.muyu.data.processing
*/ */
@EnableRabbit
@EnableMyFeignClients @EnableMyFeignClients
@SpringBootApplication @SpringBootApplication
public class MyDataApplication { public class MyDataApplication {

View File

@ -0,0 +1,48 @@
//package com.muyu.data.processing.config;
//
//import org.apache.iotdb.rpc.IoTDBConnectionException;
//import org.apache.iotdb.session.Session;
//import org.springframework.beans.factory.annotation.Value;
//import org.springframework.context.annotation.Bean;
//import org.springframework.context.annotation.Configuration;
//
///**
// * 时序数据库配置
// *
// * @Author: 胡杨
// * @Name: IotDBConfig
// * @Description: 时序数据库配置
// * @CreatedDate: 2024/9/29 下午9:30
// * @FilePath: com.muyu.data.processing.config
// */
//
//@Configuration
//public class IotDBConfig {
//
// @Value("${spring.iotdb.ip}")
// private String ip;
//
// @Value("${spring.iotdb.port}")
// private int port;
//
// @Value("${spring.iotdb.user}")
// private String user;
//
// @Value("${spring.iotdb.password}")
// private String password;
//
// @Value("${spring.iotdb.fetchSize}")
// private int fetchSize;
//
// @Bean
// public Session iotSession(){
// Session session = new Session(ip, port, user, password, fetchSize);
// try {
// session.open();
// } catch (IoTDBConnectionException e) {
// throw new RuntimeException(e);
// }
// return session;
// }
//
//}

View File

@ -1,13 +1,13 @@
package com.muyu.data.processing.controller; package com.muyu.data.processing.controller;
import com.muyu.common.core.utils.uuid.UUID;
import com.muyu.common.kafka.constants.KafkaConstants; import com.muyu.common.kafka.constants.KafkaConstants;
import com.muyu.data.processing.domain.req.TestReq; import com.muyu.common.rabbit.constants.RabbitConstants;
import com.muyu.data.processing.domain.resp.TestResp;
import com.muyu.data.processing.strategy.root.RootStrategy;
import jakarta.annotation.Resource; import jakarta.annotation.Resource;
import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.ProducerRecord;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.web.bind.annotation.*; import org.springframework.web.bind.annotation.*;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
@ -25,6 +25,10 @@ import lombok.extern.slf4j.Slf4j;
public class TestController { public class TestController {
@Resource @Resource
private KafkaProducer<String,String> kafkaProducer; private KafkaProducer<String,String> kafkaProducer;
@Resource
private RabbitTemplate rabbitTemplate;
// @Resource
// private IotDBConfig iotDBConfig;
@GetMapping("/testKafka") @GetMapping("/testKafka")
public void sendMsg(@RequestParam("msg") String msg) { public void sendMsg(@RequestParam("msg") String msg) {
@ -39,18 +43,18 @@ public class TestController {
},{ },{
"key": "timestamp", "key": "timestamp",
"label": "时间戳", "label": "时间戳",
"type": "long", "type": "String",
"value": 1727534036893L "value": "1727534036893"
},{ },{
"key": "latitude", "key": "latitude",
"label": "纬度", "label": "纬度",
"type": "float", "type": "String",
"value": 66.898F "value": "66.898"
},{ },{
"key": "longitude", "key": "longitude",
"label": "经度", "label": "经度",
"type": "float", "type": "String",
"value": 99.124F "value": "99.12"
}]"""; }]""";
ProducerRecord<String, String> producerRecord = new ProducerRecord<>(KafkaConstants.KafkaTopic, jsonString); ProducerRecord<String, String> producerRecord = new ProducerRecord<>(KafkaConstants.KafkaTopic, jsonString);
kafkaProducer.send(producerRecord); kafkaProducer.send(producerRecord);
@ -61,12 +65,25 @@ public class TestController {
} }
} }
@GetMapping("/testRabbit/GoOnline")
public void testRabbitGoOnline(@RequestParam("msg") String msg) {
rabbitTemplate.convertAndSend(RabbitConstants.GO_ONLINE_QUEUE, msg, message -> {
message.getMessageProperties().setMessageId(UUID.randomUUID().toString().replace("-",""));
return message;
});
}
// @Resource @GetMapping("/testRabbit/Downline")
// private RootStrategy rootStrategy; public void testRabbitDownline(@RequestParam("msg") String msg) {
// rabbitTemplate.convertAndSend(RabbitConstants.DOWNLINE_QUEUE, msg, message -> {
// @PostMapping("/testStrategy") message.getMessageProperties().setMessageId(UUID.randomUUID().toString().replace("-",""));
// public TestResp testStrategy(@RequestBody TestReq testReq) { return message;
// return rootStrategy.applyStrategy(testReq); });
}
// @GetMapping("/insertData")
// public void insertData(String deviceId, long time, double value) throws Exception {
// String sql = String.format("insert into root.one.%s(timestamp, temperature) values (%d, %f)", deviceId, time, value);
// iotDBConfig.iotSession().executeNonQueryStatement(sql);
// } // }
} }

View File

@ -27,10 +27,8 @@ public class IotDbData extends BaseEntity {
private String vin; private String vin;
private String key; private String latitude;
private String label; private String longitude;
private String value;
private String type;
} }

View File

@ -26,28 +26,14 @@ public class KafkaData implements Serializable {
private String key; private String key;
private String label; private String label;
private Object value; private String value;
private String type; private String type;
public void setType(String type) { // public void setValueClass() {
this.type = type; // Class<?> info = ClassType.getInfo(type);
if (StringUtils.isNotEmpty(this.type) && ObjectUtils.isNotEmpty(this.value)){ // if (info.isInstance(value)){
setValueClass(); // value = info.cast(value);
} // }
} // }
public void setValue(Object value) {
this.value = value;
if (StringUtils.isNotEmpty(this.type) && ObjectUtils.isNotEmpty(this.value)){
setValueClass();
}
}
public void setValueClass() {
Class<?> info = ClassType.getInfo(type);
if (info.isInstance(value)){
value = info.cast(value);
}
}
} }

View File

@ -27,4 +27,6 @@ public interface DataProcessingMapper{
Integer insIotDbData(@Param("key") String key, @Param("value") String value); Integer insIotDbData(@Param("key") String key, @Param("value") String value);
void strategyCheck(@Param("dataList") List<KafkaData> dataList); void strategyCheck(@Param("dataList") List<KafkaData> dataList);
Integer insIotDbDataVo(IotDbData build);
} }

View File

@ -2,8 +2,8 @@ package com.muyu.data.processing.rebbit;
import com.muyu.common.caffeine.CaffeineCacheUtils; import com.muyu.common.caffeine.CaffeineCacheUtils;
import com.muyu.common.rabbit.constants.RabbitConstants;
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Channel;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message; import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.Queue; import org.springframework.amqp.rabbit.annotation.Queue;
@ -14,26 +14,27 @@ import java.io.IOException;
import java.util.HashSet; import java.util.HashSet;
/** /**
* 线
* @Author: * @Author:
* @Name: DownlineRabbit * @Name: DownlineRabbitConsumer
* @Description: 线 * @Description: 线
* @CreatedDate: 2024/9/26 8:21 * @CreatedDate: 2024/9/26 8:21
* @FilePath: com.muyu.data.processing.rebbit * @FilePath: com.muyu.data.processing.rebbit
*/ */
@Slf4j @Slf4j
@Component @Component
public class DownlineRabbit { public class DownlineRabbitConsumer {
private CaffeineCacheUtils caffeineCacheUtils = new CaffeineCacheUtils(); private CaffeineCacheUtils caffeineCacheUtils = new CaffeineCacheUtils();
private static final HashSet<String> DOWNLINE_SET = new HashSet<>(); private static final HashSet<String> DOWNLINE_SET = new HashSet<>();
@RabbitListener(queuesToDeclare = {@Queue("Downline")}) @RabbitListener(queuesToDeclare = {@Queue(RabbitConstants.DOWNLINE_QUEUE)})
public void downline(String vin, Message message, Channel channel) { public void downline(String vin, Message message, Channel channel) {
log.info("车辆 {} 下线, 配置信息准备中。。。",vin); log.info("车辆 {} 下线, 配置信息准备中。。。",vin);
try { try {
// 重复性校验 // 重复性校验
if (DOWNLINE_SET.add(message.getMessageProperties().getMessageId())) { if (DOWNLINE_SET.add(message.getMessageProperties().getMessageId())) {
caffeineCacheUtils.deleteCarCache(vin); // caffeineCacheUtils.deleteCarCache(vin);
log.info("车辆 {} 下线, 消息已确认。。。",vin); log.info("车辆 {} 下线, 消息已确认。。。",vin);
} else { } else {
log.info("车辆 {} 下线, 消息重复消费,已确认。。。",vin); log.info("车辆 {} 下线, 消息重复消费,已确认。。。",vin);

View File

@ -2,6 +2,7 @@ package com.muyu.data.processing.rebbit;
import com.muyu.common.caffeine.CaffeineCacheUtils; import com.muyu.common.caffeine.CaffeineCacheUtils;
import com.muyu.common.rabbit.constants.RabbitConstants;
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message; import org.springframework.amqp.core.Message;
@ -9,32 +10,32 @@ import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.io.IOException; import java.io.IOException;
import java.util.HashSet; import java.util.HashSet;
/** /**
* 线
* @Author: * @Author:
* @Name: GoOnlineRabbit * @Name: GoOnlineRabbitConsumer
* @Description: 线 * @Description: 线
* @CreatedDate: 2024/9/26 7:38 * @CreatedDate: 2024/9/26 7:38
* @FilePath: com.muyu.data.processing.rebbit * @FilePath: com.muyu.data.processing.rebbit
*/ */
@Slf4j @Slf4j
@Component @Component
public class GoOnlineRabbit { public class GoOnlineRabbitConsumer {
private CaffeineCacheUtils caffeineCacheUtils = new CaffeineCacheUtils(); private CaffeineCacheUtils caffeineCacheUtils = new CaffeineCacheUtils();
private static final HashSet<String> DATA_SET = new HashSet<>(); private static final HashSet<String> DATA_SET = new HashSet<>();
@RabbitListener(queuesToDeclare = {@Queue("GoOnline")}) @RabbitListener(queuesToDeclare = {@Queue(RabbitConstants.GO_ONLINE_QUEUE)})
public void goOnline(String vin, Message message, Channel channel){ public void goOnline(String vin, Message message, Channel channel){
log.info("车辆 {} 上线, 配置信息准备中。。。",vin); log.info("车辆 {} 上线, 配置信息准备中。。。",vin);
try { try {
// 重复性校验 // 重复性校验
if (DATA_SET.add(message.getMessageProperties().getMessageId())) { if (DATA_SET.add(message.getMessageProperties().getMessageId())) {
caffeineCacheUtils.addCarCache(vin); // caffeineCacheUtils.addCarCache(vin);
log.info("车辆 {} 上线, 消息已确认。。。",vin); log.info("车辆 {} 上线, 消息已确认。。。",vin);
} else { } else {
log.info("车辆 {} 上线, 消息重复消费,已确认。。。",vin); log.info("车辆 {} 上线, 消息重复消费,已确认。。。",vin);

View File

@ -3,7 +3,6 @@ package com.muyu.data.processing.service.impl;
import javax.annotation.Resource; import javax.annotation.Resource;
import com.muyu.common.core.domain.Result;
import com.muyu.data.processing.domain.IotDbData; import com.muyu.data.processing.domain.IotDbData;
import com.muyu.data.processing.domain.KafkaData; import com.muyu.data.processing.domain.KafkaData;
import com.muyu.data.processing.strategy.root.RootStrategy; import com.muyu.data.processing.strategy.root.RootStrategy;
@ -12,7 +11,6 @@ import lombok.extern.slf4j.Slf4j;
import com.muyu.data.processing.mapper.DataProcessingMapper; import com.muyu.data.processing.mapper.DataProcessingMapper;
import com.muyu.data.processing.service.DataProcessingService; import com.muyu.data.processing.service.DataProcessingService;
import java.util.ArrayList;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
@ -34,6 +32,7 @@ public class DataProcessingServiceImpl implements DataProcessingService {
@Resource @Resource
private RootStrategy rootStrategy; private RootStrategy rootStrategy;
@Override @Override
public List<String> selectStorageGroup() { public List<String> selectStorageGroup() {
return mapper.selectStorageGroup(); return mapper.selectStorageGroup();
@ -43,10 +42,16 @@ public class DataProcessingServiceImpl implements DataProcessingService {
public void strategyCheck(List<KafkaData> dataList) { public void strategyCheck(List<KafkaData> dataList) {
HashMap<String, KafkaData> kafkaDataHashMap = new HashMap<>(); HashMap<String, KafkaData> kafkaDataHashMap = new HashMap<>();
dataList.forEach(data -> kafkaDataHashMap.put(data.getKey(), data)); dataList.forEach(data -> kafkaDataHashMap.put(data.getKey(), data));
Result<String[]> result = rootStrategy.applyStrategy(kafkaDataHashMap); // Result<String[]> result = rootStrategy.applyStrategy(kafkaDataHashMap);
String[] data = result.getData(); // String[] data = result.getData();
insIotDbData(data[0],data[1]); // insIotDbData(data[0],data[1]);
IotDbData build = IotDbData.builder()
.vin(kafkaDataHashMap.get("vin").getValue())
.timestamp(Long.parseLong(kafkaDataHashMap.get("timestamp").getValue()))
.latitude(kafkaDataHashMap.get("latitude").getValue())
.longitude(kafkaDataHashMap.get("longitude").getValue())
.build();
mapper.insIotDbDataVo(build);
// dataList.forEach(KafkaData::setValueClass); // dataList.forEach(KafkaData::setValueClass);
// mapper.strategyCheck(dataList); // mapper.strategyCheck(dataList);
} }

View File

@ -17,7 +17,7 @@ import java.util.List;
import java.util.Set; import java.util.Set;
/** /**
* - * --
* *
* @Author: * @Author:
* @Name: InsIotDbStrategy * @Name: InsIotDbStrategy

View File

@ -15,10 +15,6 @@ import org.springframework.stereotype.Component;
@Component @Component
public class DataUtils { public class DataUtils {
public static <T> T convert(Object data, Class<T> type) { public static <T> T convert(Object data, Class<T> type) {
if (type.isInstance(data)) {
return type.cast(data); return type.cast(data);
} else {
throw new IllegalArgumentException("数据 "+data+" 类型不匹配");
}
} }
} }

View File

@ -11,22 +11,13 @@ nacos:
# SPRING_AMQP_DESERIALIZATION_TRUST_ALL=true spring.amqp.deserialization.trust.all # SPRING_AMQP_DESERIALIZATION_TRUST_ALL=true spring.amqp.deserialization.trust.all
# Spring # Spring
spring: spring:
datasource: # iotdb:
username: root # ip: 47.116.173.119
password: root # port: 6667
driver-class-name: org.apache.iotdb.jdbc.IoTDBDriver # user: root
url: jdbc:iotdb://47.116.173.119:6667/ # password: root
initial-size: 5 # fetchSize: 10000
min-idle: 10 # maxActive: 10
max-active: 20
max-wait: 60000
remove-abandoned: true
remove-abandoned-timeout: 30
time-between-eviction-runs-millis: 60000
min-evictable-idle-time-millis: 300000
test-while-idle: false
test-on-borrow: false
test-on-return: false
amqp: amqp:
deserialization: deserialization:
trust: trust:
@ -77,4 +68,3 @@ spring:
logging: logging:
level: level:
com.muyu.system.mapper: DEBUG com.muyu.system.mapper: DEBUG

View File

@ -23,6 +23,13 @@
) )
</insert> </insert>
<insert id="insIotDbDataVo">
insert into
root.one.data
(timestamp, vin, latitude,longitude)
values (#{timestamp}, #{vin}, #{latitude}, #{longitude})
</insert>
</mapper> </mapper>