car-analysis
commit
e32267a063
|
@ -0,0 +1,46 @@
|
|||
######################################################################
|
||||
# Build Tools
|
||||
|
||||
.gradle
|
||||
/build/
|
||||
!gradle/wrapper/gradle-wrapper.jar
|
||||
|
||||
target/
|
||||
!.mvn/wrapper/maven-wrapper.jar
|
||||
|
||||
######################################################################
|
||||
# IDE
|
||||
|
||||
### STS ###
|
||||
.apt_generated
|
||||
.classpath
|
||||
.factorypath
|
||||
.project
|
||||
.settings
|
||||
.springBeans
|
||||
|
||||
### IntelliJ IDEA ###
|
||||
.idea
|
||||
*.iws
|
||||
*.iml
|
||||
*.ipr
|
||||
|
||||
### JRebel ###
|
||||
rebel.xml
|
||||
### NetBeans ###
|
||||
nbproject/private/
|
||||
build/*
|
||||
nbbuild/
|
||||
dist/
|
||||
nbdist/
|
||||
.nb-gradle/
|
||||
|
||||
######################################################################
|
||||
# Others
|
||||
*.log
|
||||
*.xml.versionsBackup
|
||||
*.swp
|
||||
|
||||
!*/build/*.java
|
||||
!*/build/*.html
|
||||
!*/build/*.xml
|
|
@ -0,0 +1,23 @@
|
|||
FROM openjdk:17-ea-slim
|
||||
LABEL authors="Car-two <3079188394@qq.com>"
|
||||
|
||||
RUN mkdir /car
|
||||
|
||||
# 暴露端口
|
||||
EXPOSE 9456
|
||||
|
||||
# 创建着陆点
|
||||
WORKDIR "/car"
|
||||
|
||||
# 复制新的运行程序
|
||||
COPY ./ruoyi-analysis-server/target/ruoyi-analysis-server.jar /car/app.jar
|
||||
|
||||
|
||||
# 挂载持续的目录
|
||||
VOLUME /car/logs/ruoyi-analysis-server
|
||||
|
||||
|
||||
# 运行你的jar包
|
||||
CMD ["java","-jar","/car/app.jar"]
|
||||
|
||||
|
|
@ -0,0 +1,157 @@
|
|||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||
xmlns="http://maven.apache.org/POM/4.0.0"
|
||||
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||
<parent>
|
||||
<groupId>com.ruoyi</groupId>
|
||||
<artifactId>ruoyi</artifactId>
|
||||
<version>3.6.3</version>
|
||||
</parent>
|
||||
<packaging>jar</packaging>
|
||||
<version>3.6.3</version>
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
|
||||
<artifactId>ruoyi-analysis</artifactId>
|
||||
|
||||
<description>
|
||||
ruoyi-modules-analysis系统模块
|
||||
</description>
|
||||
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>org.projectlombok</groupId>
|
||||
<artifactId>lombok</artifactId>
|
||||
</dependency>
|
||||
|
||||
<!-- SpringCloud Alibaba Nacos -->
|
||||
<dependency>
|
||||
<groupId>com.alibaba.cloud</groupId>
|
||||
<artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
|
||||
</dependency>
|
||||
|
||||
|
||||
|
||||
<!-- SpringCloud Alibaba Nacos Config -->
|
||||
<dependency>
|
||||
<groupId>com.alibaba.cloud</groupId>
|
||||
<artifactId>spring-cloud-starter-alibaba-nacos-config</artifactId>
|
||||
</dependency>
|
||||
|
||||
<!-- SpringCloud Alibaba Sentinel -->
|
||||
<dependency>
|
||||
<groupId>com.alibaba.cloud</groupId>
|
||||
<artifactId>spring-cloud-starter-alibaba-sentinel</artifactId>
|
||||
</dependency>
|
||||
|
||||
<!-- SpringBoot Actuator -->
|
||||
<dependency>
|
||||
<groupId>org.springframework.boot</groupId>
|
||||
<artifactId>spring-boot-starter-actuator</artifactId>
|
||||
</dependency>
|
||||
|
||||
<!-- Swagger UI -->
|
||||
<dependency>
|
||||
<groupId>io.springfox</groupId>
|
||||
<artifactId>springfox-swagger-ui</artifactId>
|
||||
<version>${swagger.fox.version}</version>
|
||||
</dependency>
|
||||
|
||||
<!-- Mysql Connector -->
|
||||
<dependency>
|
||||
<groupId>com.mysql</groupId>
|
||||
<artifactId>mysql-connector-j</artifactId>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>com.ruoyi</groupId>
|
||||
<artifactId>ruoyi-common-core</artifactId>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>com.ruoyi</groupId>
|
||||
<artifactId>ruoyi-analysis-remote</artifactId>
|
||||
<version>3.6.3</version>
|
||||
</dependency>
|
||||
|
||||
<!-- RuoYi Common DataSource -->
|
||||
<dependency>
|
||||
<groupId>com.ruoyi</groupId>
|
||||
<artifactId>ruoyi-common-datasource</artifactId>
|
||||
<version>3.6.3</version>
|
||||
</dependency>
|
||||
|
||||
<!-- RuoYi Common DataScope -->
|
||||
<dependency>
|
||||
<groupId>com.ruoyi</groupId>
|
||||
<artifactId>ruoyi-common-datascope</artifactId>
|
||||
<version>3.6.3</version>
|
||||
</dependency>
|
||||
|
||||
<!-- RuoYi Common Log -->
|
||||
<dependency>
|
||||
<groupId>com.ruoyi</groupId>
|
||||
<artifactId>ruoyi-common-log</artifactId>
|
||||
<version>3.6.3</version>
|
||||
</dependency>
|
||||
<!-- RuoYi Common Swagger -->
|
||||
<dependency>
|
||||
<groupId>com.ruoyi</groupId>
|
||||
<artifactId>ruoyi-common-swagger</artifactId>
|
||||
<version>3.6.3</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.ruoyi</groupId>
|
||||
<artifactId>ruoyi-common-redis</artifactId>
|
||||
<version>3.6.3</version>
|
||||
<exclusions>
|
||||
<exclusion>
|
||||
<artifactId>feign-slf4j</artifactId>
|
||||
<groupId>io.github.openfeign</groupId>
|
||||
</exclusion>
|
||||
</exclusions>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>com.ruoyi</groupId>
|
||||
<artifactId>ruoyi-file-remote</artifactId>
|
||||
<version>3.6.3</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.testng</groupId>
|
||||
<artifactId>testng</artifactId>
|
||||
<version>7.8.0</version>
|
||||
<scope>compile</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.hbase</groupId>
|
||||
<artifactId>hbase-client</artifactId>
|
||||
<version>2.4.7</version>
|
||||
<exclusions>
|
||||
<exclusion>
|
||||
<artifactId>slf4j-reload4j</artifactId>
|
||||
<groupId>org.slf4j</groupId>
|
||||
</exclusion>
|
||||
</exclusions>
|
||||
</dependency>
|
||||
|
||||
<!-- kafka-->
|
||||
<dependency>
|
||||
<groupId>org.springframework.kafka</groupId>
|
||||
<artifactId>spring-kafka</artifactId>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>cn.hippo4j</groupId>
|
||||
<artifactId>hippo4j-core-spring-boot-starter</artifactId>
|
||||
<version>1.3.1</version>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.springframework.boot</groupId>
|
||||
<artifactId>spring-boot-starter-amqp</artifactId>
|
||||
</dependency>
|
||||
|
||||
</dependencies>
|
||||
|
||||
|
||||
</project>
|
|
@ -0,0 +1,38 @@
|
|||
package com.ruoyi.analysis;
|
||||
|
||||
import com.ruoyi.common.security.annotation.EnableCustomConfig;
|
||||
import com.ruoyi.common.security.annotation.EnableRyFeignClients;
|
||||
import com.ruoyi.common.swagger.annotation.EnableCustomSwagger2;
|
||||
import org.springframework.boot.SpringApplication;
|
||||
import org.springframework.boot.autoconfigure.SpringBootApplication;
|
||||
import org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration;
|
||||
import org.springframework.scheduling.annotation.EnableScheduling;
|
||||
|
||||
/**
|
||||
* 启动类
|
||||
*/
|
||||
@EnableCustomConfig
|
||||
@EnableCustomSwagger2
|
||||
@EnableRyFeignClients
|
||||
@EnableScheduling
|
||||
@SpringBootApplication(exclude = {DataSourceAutoConfiguration.class })
|
||||
public class RuoYianalysisApplication {
|
||||
|
||||
public static void main(String[] args)
|
||||
{
|
||||
SpringApplication.run(RuoYianalysisApplication.class, args);
|
||||
System.out.println("(♥◠‿◠)ノ゙ 系统模块启动成功 ლ(´ڡ`ლ)゙ \n" +
|
||||
" .-------. ____ __ \n" +
|
||||
" | _ _ \\ \\ \\ / / \n" +
|
||||
" | ( ' ) | \\ _. / ' \n" +
|
||||
" |(_ o _) / _( )_ .' \n" +
|
||||
" | (_,_).' __ ___(_ o _)' \n" +
|
||||
" | |\\ \\ | || |(_,_)' \n" +
|
||||
" | | \\ `' /| `-' / \n" +
|
||||
" | | \\ / \\ / \n" +
|
||||
" ''-' `'-' `-..-' ");
|
||||
}
|
||||
|
||||
|
||||
|
||||
}
|
|
@ -0,0 +1,160 @@
|
|||
package com.ruoyi.analysis.config;
|
||||
|
||||
import com.alibaba.fastjson.JSON;
|
||||
import com.ruoyi.analysis.constants.*;
|
||||
import com.ruoyi.analysis.domain.VehicleMessage;
|
||||
import com.ruoyi.analysis.handler.MessageHandler;
|
||||
import com.ruoyi.analysis.handler.MessageVerify;
|
||||
import com.ruoyi.analysis.hbase.service.HBaseService;
|
||||
import com.ruoyi.common.redis.service.RedisService;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.kafka.clients.consumer.ConsumerRecord;
|
||||
import org.springframework.amqp.rabbit.core.RabbitTemplate;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.kafka.annotation.KafkaListener;
|
||||
import org.springframework.kafka.annotation.TopicPartition;
|
||||
import org.springframework.kafka.support.Acknowledgment;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.text.SimpleDateFormat;
|
||||
import java.util.Date;
|
||||
import java.util.Optional;
|
||||
import java.util.UUID;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.ThreadPoolExecutor;
|
||||
|
||||
/**
|
||||
* Kafka消费者报文
|
||||
**/
|
||||
@Slf4j
|
||||
@Component
|
||||
public class KafkaConsumer {
|
||||
/**
|
||||
* 注入线程池
|
||||
*/
|
||||
@Autowired
|
||||
private ThreadPoolExecutor threadPoolExecutor;
|
||||
|
||||
/**
|
||||
* 注入redis
|
||||
*/
|
||||
@Autowired
|
||||
private RedisService redisService;
|
||||
/**
|
||||
* 注入rabbitMQ
|
||||
*/
|
||||
@Autowired
|
||||
private RabbitTemplate rabbitTemplate;
|
||||
/**
|
||||
* Hbase工具类
|
||||
*/
|
||||
@Autowired
|
||||
private HBaseService hBaseService;
|
||||
/**
|
||||
* 初始化线程池
|
||||
*/
|
||||
private ExecutorService executorService= Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()*2);
|
||||
|
||||
|
||||
private SimpleDateFormat tableNameFmt = new SimpleDateFormat("yyyy-MM-dd");
|
||||
|
||||
/**
|
||||
* kafka消息接收端
|
||||
*/
|
||||
@KafkaListener(topicPartitions = {@TopicPartition(topic = "test",partitions = {"2"})})
|
||||
public void OnMessage(ConsumerRecord<?,?> consumerRecord, Acknowledgment acknowledgment) throws InterruptedException{
|
||||
try {
|
||||
//提交到线程池执行
|
||||
threadPoolExecutor.submit(()->{
|
||||
messageHandler(consumerRecord); //处理消息方法体;
|
||||
});
|
||||
}finally {
|
||||
acknowledgment.acknowledge();
|
||||
}
|
||||
|
||||
}
|
||||
/**
|
||||
* 处理消息方法体
|
||||
*/
|
||||
public void messageHandler(ConsumerRecord<?,?> consumerRecord){
|
||||
//调用Optional防止空指针异常
|
||||
Optional<?> value=Optional.ofNullable(consumerRecord.value());
|
||||
String decimalInfo=null;
|
||||
String StringMessage=null;
|
||||
if (value.isPresent()){
|
||||
//获取到消息的内容
|
||||
Object object = value.get();
|
||||
StringMessage = (String) object;
|
||||
|
||||
|
||||
log.info("报文内容:"+StringMessage);
|
||||
String content = StringMessage.substring(4, StringMessage.length() - 7);
|
||||
decimalInfo = MessageVerify.theDecimal(content);
|
||||
String vin = "";
|
||||
if (decimalInfo.contains(VehicleConstant.NETTY_WILL_CLOSE)) {
|
||||
vin = decimalInfo.substring(VehicleConstant.NETTY_WILL_CLOSE.length(), VehicleConstant.NETTY_WILL_CLOSE.length() + 17);
|
||||
}
|
||||
//车辆启动
|
||||
if (decimalInfo.contains(VehicleConstant.VEHICLE_START_SUF)) {
|
||||
|
||||
vin = decimalInfo.substring(VehicleConstant.VEHICLE_START_SUF.length(), VehicleConstant.VEHICLE_START_SUF.length() + 17);
|
||||
|
||||
|
||||
log.info("车辆启动信息;vin:{};内容:{}", vin, decimalInfo);
|
||||
VehicleStatusEntity statusEntity = new VehicleStatusEntity(vin, VehicleConstants.VEHICLE_START, System.currentTimeMillis());
|
||||
rabbitTemplate.convertSendAndReceive(VehicleConstants.VEHICLE_STATUS_UPDATE_QUEUE, JSON.toJSONString(statusEntity),message -> {
|
||||
message.getMessageProperties().setMessageId(UUID.randomUUID().toString());
|
||||
return message;
|
||||
});
|
||||
return;
|
||||
}
|
||||
//车辆停止
|
||||
if (decimalInfo.contains(VehicleConstant.VEHICLE_STOP_SUF)) {
|
||||
vin = decimalInfo.substring(VehicleConstant.VEHICLE_STOP_SUF.length(), VehicleConstant.VEHICLE_STOP_SUF.length() + 17);
|
||||
log.info("车辆停止信息;vin:{};内容:{}", vin, decimalInfo);
|
||||
VehicleStatusEntity statusEntity = new VehicleStatusEntity(vin, VehicleConstants.VEHICLE_END, System.currentTimeMillis());
|
||||
rabbitTemplate.convertSendAndReceive(VehicleConstants.VEHICLE_STATUS_UPDATE_QUEUE, JSON.toJSONString(statusEntity),message -> {
|
||||
message.getMessageProperties().setMessageId(UUID.randomUUID().toString());
|
||||
return message;
|
||||
});
|
||||
return;
|
||||
}
|
||||
//车辆消息
|
||||
if (decimalInfo.contains(VehicleConstant.VEHICLE_MSG_SUF)) {
|
||||
vin = decimalInfo.substring(VehicleConstant.VEHICLE_MSG_SUF.length(), VehicleConstant.VEHICLE_MSG_SUF.length() + 17);
|
||||
//拼接上时间戳
|
||||
decimalInfo += System.currentTimeMillis();
|
||||
log.info("车辆普通消息;vin:{};内容:{}", vin, decimalInfo);
|
||||
}
|
||||
}
|
||||
|
||||
log.info("topic:{},分区:{},消息内容:{}","vehicle_message:"+consumerRecord.partition(),decimalInfo);
|
||||
//将车辆消息转为实体类
|
||||
VehicleMessage vehicleMessage= MessageHandler.messageTranslated(decimalInfo);
|
||||
log.info("切割过后的报文:"+vehicleMessage);
|
||||
//车辆实时数据存入Redis
|
||||
redisService.setCacheObject(RedisConstant.CURRENT_INFO+":"+vehicleMessage.getVinCode(),JSON.toJSONString(vehicleMessage));
|
||||
if(vehicleMessage.getBatteryCurrent()!=null){
|
||||
//车辆故障处理
|
||||
rabbitTemplate.convertSendAndReceive(RabbitConstant.FAULT_HANDLER_QUEUE,JSON.toJSONString(decimalInfo),message -> {
|
||||
message.getMessageProperties().setMessageId(UUID.randomUUID().toString());
|
||||
return message;
|
||||
});
|
||||
}
|
||||
//车辆围栏处理
|
||||
rabbitTemplate.convertSendAndReceive(RabbitConstant.FENCE_HANDLER_QUEUE,JSON.toJSONString(vehicleMessage),message -> {
|
||||
message.getMessageProperties().setMessageId(UUID.randomUUID().toString());
|
||||
return message;
|
||||
});
|
||||
try {
|
||||
String tableName = HBaseConstant.HBASE_TABLE_PREFIX + tableNameFmt.format(new Date(vehicleMessage.getTime()));
|
||||
log.info("信息入库;database:{},vin:{},time:{}",tableName,vehicleMessage.getVinCode(),System.currentTimeMillis());
|
||||
hBaseService.insertRecordsForObj(tableName,vehicleMessage.getVinCode()+":"+vehicleMessage.getTime(),"analysis",vehicleMessage);
|
||||
} catch (IOException e) {
|
||||
log.error("添加到HBase出错:{}",e.getMessage());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,31 @@
|
|||
package com.ruoyi.analysis.config;
|
||||
|
||||
|
||||
import com.ruoyi.analysis.constants.RabbitConstant;
|
||||
import org.springframework.amqp.core.Queue;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
/**
|
||||
* RabbitMq队列创建
|
||||
*/
|
||||
@Component
|
||||
public class RabbitConfig {
|
||||
/**
|
||||
* 创建故障处理队列
|
||||
* @return
|
||||
*/
|
||||
@Bean
|
||||
public Queue initFaultHandlerQueue(){
|
||||
return new Queue(RabbitConstant.FAULT_HANDLER_QUEUE,true);
|
||||
}
|
||||
|
||||
/**
|
||||
* 创建围栏处理队列
|
||||
* @return
|
||||
*/
|
||||
@Bean
|
||||
public Queue initFenceHandlerQueue(){
|
||||
return new Queue(RabbitConstant.FENCE_HANDLER_QUEUE,true);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,31 @@
|
|||
package com.ruoyi.analysis.config;
|
||||
|
||||
|
||||
import com.ruoyi.analysis.constants.HBaseConstant;
|
||||
import com.ruoyi.analysis.hbase.service.impl.HBaseServiceImpl;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.boot.ApplicationArguments;
|
||||
import org.springframework.boot.ApplicationRunner;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import java.text.SimpleDateFormat;
|
||||
import java.util.Date;
|
||||
|
||||
/**
|
||||
*启动判断当前日期创建表
|
||||
*/
|
||||
@Component
|
||||
public class RunnerAfter implements ApplicationRunner {
|
||||
@Autowired
|
||||
private HBaseServiceImpl hBaseServiceImpl;
|
||||
|
||||
@Override
|
||||
public void run(ApplicationArguments args) throws Exception {
|
||||
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd");
|
||||
String nowDay = sdf.format(new Date());
|
||||
//项目启动时根据当前日期判断是否创建表
|
||||
if(!hBaseServiceImpl.isTableExists(HBaseConstant.HBASE_TABLE_PREFIX+nowDay)){
|
||||
hBaseServiceImpl.createTable(HBaseConstant.HBASE_TABLE_PREFIX+nowDay,new String[]{"analysis"});
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,26 @@
|
|||
package com.ruoyi.analysis.config;
|
||||
|
||||
import cn.hippo4j.core.executor.DynamicThreadPool;
|
||||
import cn.hippo4j.core.executor.support.ThreadPoolBuilder;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
|
||||
import java.util.concurrent.ThreadPoolExecutor;
|
||||
|
||||
/**
|
||||
* 线程池配置封装
|
||||
*/
|
||||
@Configuration
|
||||
public class ThreadPoolConfig {
|
||||
@Bean
|
||||
@DynamicThreadPool
|
||||
public ThreadPoolExecutor alanAnalysisThreadPool(){
|
||||
String threadPoolId = "networking-analysis";
|
||||
ThreadPoolExecutor messageConsumeDynamicExecutor = ThreadPoolBuilder.builder()
|
||||
.threadFactory(threadPoolId)//工厂名称
|
||||
.threadPoolId(threadPoolId)//线程池名称
|
||||
.dynamicPool()//线程池特性
|
||||
.build();//调用方法
|
||||
return messageConsumeDynamicExecutor;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,33 @@
|
|||
package com.ruoyi.analysis.config;
|
||||
|
||||
|
||||
import com.ruoyi.analysis.constants.HBaseConstant;
|
||||
import com.ruoyi.analysis.hbase.service.impl.HBaseServiceImpl;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.scheduling.annotation.Scheduled;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.text.SimpleDateFormat;
|
||||
import java.util.Date;
|
||||
|
||||
/**
|
||||
* 定时每日创建表重新赋值新表地址
|
||||
*/
|
||||
@Component
|
||||
public class TimeConfig {
|
||||
@Autowired
|
||||
private HBaseServiceImpl hBaseServiceImpl;
|
||||
private SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd");
|
||||
|
||||
|
||||
@Scheduled(cron = "0 0 0 * * ?")
|
||||
public void flushNowDay() throws IOException {
|
||||
String nowDay = sdf.format(new Date());
|
||||
//判断表是否存在
|
||||
if(!hBaseServiceImpl.isTableExists(HBaseConstant.HBASE_TABLE_PREFIX+nowDay)){
|
||||
//不存在创建表
|
||||
hBaseServiceImpl.createTable(HBaseConstant.HBASE_TABLE_PREFIX+nowDay,new String[]{"analysis"});
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,17 @@
|
|||
package com.ruoyi.analysis.constants;
|
||||
|
||||
/**
|
||||
* Hbase常量
|
||||
*/
|
||||
public class HBaseConstant {
|
||||
/**
|
||||
* HBase表名
|
||||
*/
|
||||
public static final String HBASE_TABLE_NAME = "vehicle_info";
|
||||
|
||||
/**
|
||||
* 表名前缀
|
||||
*/
|
||||
public static final String HBASE_TABLE_PREFIX = "vehicle_info_";
|
||||
|
||||
}
|
|
@ -0,0 +1,16 @@
|
|||
package com.ruoyi.analysis.constants;
|
||||
|
||||
/**
|
||||
* kafka常量
|
||||
**/
|
||||
public class KafkaConstant {
|
||||
/**
|
||||
* kafka的topic
|
||||
*/
|
||||
public static final String ANALYSIS_MESSAGE="analysis_message";
|
||||
|
||||
/**
|
||||
* kafka的ip
|
||||
*/
|
||||
public static final String KAFKA_SERVERS = "43.142.96.146:9092,43.142.96.146:9093,43.142.96.146:9094";
|
||||
}
|
|
@ -0,0 +1,16 @@
|
|||
package com.ruoyi.analysis.constants;
|
||||
|
||||
/**
|
||||
* RabbitMQ常量
|
||||
*/
|
||||
public class RabbitConstant {
|
||||
/**
|
||||
* 故障处理队列
|
||||
*/
|
||||
public static final String FAULT_HANDLER_QUEUE="fault_handler_queue";
|
||||
/**
|
||||
* 车辆围栏队列
|
||||
*/
|
||||
public static final String FENCE_HANDLER_QUEUE="fence_handler_queue";
|
||||
|
||||
}
|
|
@ -0,0 +1,12 @@
|
|||
package com.ruoyi.analysis.constants;
|
||||
|
||||
/**
|
||||
* redis常量
|
||||
*/
|
||||
public class RedisConstant {
|
||||
/**
|
||||
* redis的实时信息
|
||||
*/
|
||||
public static final String CURRENT_INFO="current_info";
|
||||
|
||||
}
|
|
@ -0,0 +1,14 @@
|
|||
package com.ruoyi.analysis.constants;
|
||||
|
||||
/**
|
||||
* 天时间戳
|
||||
*/
|
||||
public class TimeConstant {
|
||||
|
||||
/**
|
||||
* 一天的时间戳
|
||||
*/
|
||||
|
||||
public static final Integer DAY_MILLISECOND=86400000;
|
||||
|
||||
}
|
|
@ -0,0 +1,76 @@
|
|||
package com.ruoyi.analysis.constants;
|
||||
|
||||
/**
|
||||
* 连接常量
|
||||
*/
|
||||
public class VehicleConstant {
|
||||
/**
|
||||
* 分包符
|
||||
*/
|
||||
public static final String DATA_PACK_SEPARATOR = "#$&*";
|
||||
|
||||
/**
|
||||
* 报文起始位
|
||||
*/
|
||||
public static final String MSG_START = "7E ";
|
||||
|
||||
/**
|
||||
* 报文结束位
|
||||
*/
|
||||
public static final String MSG_END = "7E";
|
||||
|
||||
/**
|
||||
* 连接消息VIN
|
||||
*/
|
||||
public final static String START_VIN_SUF = "START_VIN:";
|
||||
/**
|
||||
* 连接消息VIN
|
||||
*/
|
||||
public final static String START_VIN_SUCCESS_SUF = "SUCCESS_VIN:";
|
||||
/**
|
||||
* 车辆消息报文前缀
|
||||
*/
|
||||
public final static String VEHICLE_MSG_SUF = "VEHICLE_MSG:";
|
||||
/**
|
||||
* 车辆启动报文前缀
|
||||
*/
|
||||
public final static String VEHICLE_START_SUF = "VEHICLE_START:";
|
||||
/**
|
||||
* 车辆关闭报文前缀
|
||||
*/
|
||||
public final static String VEHICLE_STOP_SUF = "VEHICLE_STOP:";
|
||||
|
||||
/**
|
||||
* 连接启动信息
|
||||
*/
|
||||
public final static String NETTY_CONNECT = "CONNECT";
|
||||
/**
|
||||
* 连接关闭信息
|
||||
*/
|
||||
public final static String NETTY_WILL_CLOSE = "WILL_CLOSE:";
|
||||
/**
|
||||
* 连接关闭信息
|
||||
*/
|
||||
public final static String NETTY_CLOSE = "CLOSE";
|
||||
|
||||
/**
|
||||
* 车辆VIN正则表达式
|
||||
*/
|
||||
public final static String VIN_REGEX = "^(?![0-9]+$)(?![a-zA-Z]+$)[0-9A-Za-z]{17}$";
|
||||
|
||||
|
||||
/**
|
||||
* 车辆基础故障组
|
||||
*/
|
||||
public final static String VEHICLE_BASE_FAULT = "vehicle_base";
|
||||
|
||||
/**
|
||||
* 车辆零配件故障组
|
||||
*/
|
||||
public final static String VEHICLE_PARTS_FAULT = "vehicle_parts";
|
||||
|
||||
/**
|
||||
* 车辆电池故障组
|
||||
*/
|
||||
public final static String VEHICLE_BATTERY_FAULT = "vehicle_battery";
|
||||
}
|
|
@ -0,0 +1,26 @@
|
|||
package com.ruoyi.analysis.constants;
|
||||
|
||||
/**
|
||||
* 车辆常量
|
||||
*/
|
||||
public class VehicleConstants {
|
||||
/**
|
||||
* 车辆启动
|
||||
*/
|
||||
public static final Integer VEHICLE_START = 1;
|
||||
|
||||
/**
|
||||
* 车辆停止
|
||||
*/
|
||||
public static final Integer VEHICLE_END = 0;
|
||||
|
||||
/**
|
||||
* 车辆状态更改队列
|
||||
*/
|
||||
public static final String VEHICLE_STATUS_UPDATE_QUEUE = "vehicle_status_update_queue";
|
||||
|
||||
/**
|
||||
* 车辆信息前缀
|
||||
*/
|
||||
public static final String VEHICLE_INFO_MAP = "vehicle_info_map";
|
||||
}
|
|
@ -0,0 +1,51 @@
|
|||
package com.ruoyi.analysis.constants;
|
||||
|
||||
/**
|
||||
* 时间错车辆状态
|
||||
*/
|
||||
public class VehicleStatusEntity {
|
||||
/**
|
||||
* vin
|
||||
*/
|
||||
private String vin;
|
||||
/**
|
||||
* 车辆状态
|
||||
*/
|
||||
private Integer status;
|
||||
/**
|
||||
* 车辆时间戳
|
||||
*/
|
||||
private long timestamp;
|
||||
|
||||
public String getVin() {
|
||||
return vin;
|
||||
}
|
||||
|
||||
public void setVin(String vin) {
|
||||
this.vin = vin;
|
||||
}
|
||||
|
||||
public Integer getStatus() {
|
||||
return status;
|
||||
}
|
||||
|
||||
public void setStatus(Integer status) {
|
||||
this.status = status;
|
||||
}
|
||||
|
||||
public long getTimestamp() {
|
||||
return timestamp;
|
||||
}
|
||||
|
||||
public void setTimestamp(long timestamp) {
|
||||
this.timestamp = timestamp;
|
||||
}
|
||||
|
||||
|
||||
|
||||
public VehicleStatusEntity(String vin, Integer status, long timestamp) {
|
||||
this.vin = vin;
|
||||
this.status = status;
|
||||
this.timestamp = timestamp;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,79 @@
|
|||
package com.ruoyi.analysis.controller;
|
||||
|
||||
import com.google.common.collect.Lists;
|
||||
import com.ruoyi.analysis.constants.HBaseConstant;
|
||||
import com.ruoyi.analysis.constants.TimeConstant;
|
||||
import com.ruoyi.analysis.domain.VehicleMessage;
|
||||
import com.ruoyi.analysis.hbase.service.HBaseService;
|
||||
import com.ruoyi.collect.domain.CarRecord;
|
||||
import com.ruoyi.common.core.web.domain.AjaxResult;
|
||||
import lombok.extern.log4j.Log4j2;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.web.bind.annotation.PostMapping;
|
||||
import org.springframework.web.bind.annotation.RequestBody;
|
||||
import org.springframework.web.bind.annotation.RequestMapping;
|
||||
import org.springframework.web.bind.annotation.RestController;
|
||||
import java.io.IOException;
|
||||
import java.lang.reflect.InvocationTargetException;
|
||||
import java.text.ParseException;
|
||||
import java.text.SimpleDateFormat;
|
||||
import java.util.Date;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* 查询历史轨迹
|
||||
* @Author: JCC
|
||||
* @Date: 2023/8/30 18:44
|
||||
* @Description:
|
||||
*/
|
||||
@Log4j2
|
||||
@RestController
|
||||
@RequestMapping("/analysis")
|
||||
public class AnalysisController {
|
||||
|
||||
@Autowired
|
||||
private HBaseService hBaseService;
|
||||
|
||||
public static SimpleDateFormat targetDay = new SimpleDateFormat("yyyy-MM-dd");
|
||||
|
||||
@PostMapping("queryHistory")
|
||||
public AjaxResult queryHistory(@RequestBody CarRecord driverRecord) throws IOException, NoSuchFieldException, InvocationTargetException, IllegalAccessException, ParseException {
|
||||
log.info("查询=开始时间:{},结束时间:{}",new Date(Long.parseLong(driverRecord.getStartKey())).toLocaleString(),
|
||||
new Date(Long.parseLong(driverRecord.getEndKey())).toLocaleString());
|
||||
//根据输入输出时间格式化日期
|
||||
String startDay = targetDay.format(new Date(Long.parseLong(driverRecord.getStartKey())));
|
||||
String endDay = targetDay.format(new Date(Long.parseLong(driverRecord.getEndKey())));
|
||||
//判断开始结束是否同一天
|
||||
if (startDay.equals(endDay)){
|
||||
//同一天直接通过开始、结束时间戳查询
|
||||
List<VehicleMessage> list = hBaseService.scanRange(HBaseConstant.HBASE_TABLE_PREFIX+startDay,
|
||||
driverRecord.getCarVin()+":"+driverRecord.getStartKey(), driverRecord.getCarVin()+":"+driverRecord.getEndKey());
|
||||
return AjaxResult.success(list);
|
||||
}
|
||||
|
||||
List<VehicleMessage> list = Lists.newArrayList();
|
||||
//定义当前日期名称
|
||||
String indexDateTime = "";
|
||||
//获取开始时间时间戳
|
||||
long startTime = Long.parseLong(driverRecord.getStartKey());
|
||||
//获取结束时间当日00:00的时间戳
|
||||
long endTime = Long.parseLong(driverRecord.getEndKey());
|
||||
indexDateTime = HBaseConstant.HBASE_TABLE_PREFIX + targetDay.format(new Date(startTime));
|
||||
//将第一天的数据插入集合
|
||||
list.addAll(hBaseService.scanRangeByStart(indexDateTime,driverRecord.getCarVin()+":"+startTime));
|
||||
long indexTime = targetDay.parse(targetDay.format(startTime+ TimeConstant.DAY_MILLISECOND)).getTime();
|
||||
long count = (endTime - indexTime) / TimeConstant.DAY_MILLISECOND;
|
||||
for (long l = 0; l < count; l++) {
|
||||
String tempTableName = HBaseConstant.HBASE_TABLE_PREFIX + targetDay.format(indexTime);
|
||||
list.addAll(hBaseService.scanRangeByStart(tempTableName,driverRecord.getCarVin()+":"+indexTime));
|
||||
indexTime += TimeConstant.DAY_MILLISECOND;
|
||||
}
|
||||
//根据最后一天的时间戳格式化时间
|
||||
String format = targetDay.format(new Date(indexTime));
|
||||
//获得最后一天的数据添加到集合
|
||||
list.addAll(hBaseService.scanRange(HBaseConstant.HBASE_TABLE_PREFIX+format,
|
||||
driverRecord.getCarVin()+":"+indexTime,driverRecord.getCarVin()+":"+endTime));
|
||||
return AjaxResult.success(list);
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,583 @@
|
|||
package com.ruoyi.analysis.domain;
|
||||
|
||||
/**
|
||||
* 报文实体类
|
||||
*/
|
||||
public class VehicleMessage {
|
||||
//消息标识
|
||||
private String identification;
|
||||
//VIN码
|
||||
private String vinCode;
|
||||
|
||||
//经度
|
||||
private String longitude;
|
||||
//纬度
|
||||
private String latitude;
|
||||
//车速
|
||||
private String speedOfVehicle;
|
||||
//总里程
|
||||
private String totalMileage;
|
||||
//总电压
|
||||
private String totalVoltage;
|
||||
//总电流
|
||||
private String combinedCurrent;
|
||||
//绝缘电阻
|
||||
private String insulationResistance;
|
||||
//档位
|
||||
private String gearPosition;
|
||||
//加速踏板行程值
|
||||
private String acceleratorPedalTravelValue;
|
||||
//制动踏板行程值
|
||||
private String brakePedalTravelValue;
|
||||
//燃料消耗率
|
||||
private String specificFuelConsumption;
|
||||
//电机控制器温度
|
||||
private String motorControllerTemperature;
|
||||
//电机转速
|
||||
private String motorSpeed;
|
||||
//电机转矩
|
||||
private String motorTorque;
|
||||
//电机温度
|
||||
private String motorTemperature;
|
||||
//电机电压
|
||||
private String motorVoltage;
|
||||
//电机电流
|
||||
private String motorCurrent;
|
||||
//动力电池剩余电量SOC
|
||||
private String electricalSoc;
|
||||
//当前状态允许的最大反馈功率
|
||||
private String maximumFeedbackPower;
|
||||
//当前状态允许最大放电功率
|
||||
private String maximumDischargePower;
|
||||
//BMS自检计数器
|
||||
private String bms;
|
||||
//动力电池充放电电流
|
||||
private String batteryCurrent;
|
||||
//动力电池负载端总电压V3
|
||||
private String v3;
|
||||
//单次最大电压
|
||||
private String singleMaximumVoltage;
|
||||
//单体电池最低电压
|
||||
private String minimumVoltageOfBattery;
|
||||
//单体电池最高温度
|
||||
private String maximumBatteryTemperature;
|
||||
//单体电池最低温度
|
||||
private String minimumBatteryTemperature;
|
||||
//动力电池可用容量
|
||||
private String powerBatteryAvailableCapacity;
|
||||
//车辆状态
|
||||
private Integer carStatus;
|
||||
//充电状态
|
||||
private Integer chargingState;
|
||||
//运行状态
|
||||
private Integer runningState;
|
||||
//SOC
|
||||
private String soc;
|
||||
//可充电储能装置工作状态
|
||||
private String workingCondition;
|
||||
//驱动电机状态
|
||||
private String driveMotorCondition;
|
||||
//定位是否有效
|
||||
private String whetherTheLocationValid;
|
||||
//EAS
|
||||
private String eas;
|
||||
//PTC
|
||||
private String ptc;
|
||||
//EPS
|
||||
private String eps;
|
||||
//ABS
|
||||
private String abs;
|
||||
//MCU
|
||||
private String mcu;
|
||||
//动力电池加热状态
|
||||
private String powerBatteryHeatingState;
|
||||
//动力电池当前状态
|
||||
private String powerBattery;
|
||||
//动力电池保温状态
|
||||
private String powerBatteryInsulationState;
|
||||
//DCDC
|
||||
private String dcdc;
|
||||
//CHG
|
||||
private String chg;
|
||||
|
||||
//时间
|
||||
private Long time;
|
||||
|
||||
public String getIdentification() {
|
||||
return identification;
|
||||
}
|
||||
|
||||
public void setIdentification(Object identification) {
|
||||
if(identification!=null) {this.identification = (String) identification;}
|
||||
}
|
||||
|
||||
public String getVinCode() {
|
||||
return vinCode;
|
||||
}
|
||||
|
||||
public void setVinCode(Object vinCode) {
|
||||
if (vinCode!=null) {
|
||||
this.vinCode = (String) vinCode;
|
||||
}
|
||||
}
|
||||
|
||||
public String getLongitude() {
|
||||
return longitude;
|
||||
}
|
||||
|
||||
public void setLongitude(Object longitude) {
|
||||
if (longitude!=null) {
|
||||
this.longitude = (String) longitude;
|
||||
}
|
||||
}
|
||||
|
||||
public String getLatitude() {
|
||||
return latitude;
|
||||
}
|
||||
|
||||
public void setLatitude(Object latitude) {
|
||||
if (latitude!=null) {
|
||||
this.latitude = (String) latitude;
|
||||
}
|
||||
}
|
||||
|
||||
public String getSpeedOfVehicle() {
|
||||
return speedOfVehicle;
|
||||
}
|
||||
|
||||
public void setSpeedOfVehicle(Object speedOfVehicle) {
|
||||
if (speedOfVehicle!=null) {
|
||||
this.speedOfVehicle = (String) speedOfVehicle;
|
||||
}
|
||||
}
|
||||
|
||||
public String getTotalMileage() {
|
||||
return totalMileage;
|
||||
}
|
||||
|
||||
public void setTotalMileage(Object totalMileage) {
|
||||
if (totalMileage!=null) {
|
||||
this.totalMileage = (String) totalMileage;
|
||||
}
|
||||
}
|
||||
|
||||
public String getTotalVoltage() {
|
||||
return totalVoltage;
|
||||
}
|
||||
|
||||
public void setTotalVoltage(Object totalVoltage) {
|
||||
if (totalVoltage!=null) {
|
||||
this.totalVoltage = (String) totalVoltage;
|
||||
}
|
||||
}
|
||||
|
||||
public String getCombinedCurrent() {
|
||||
return combinedCurrent;
|
||||
}
|
||||
|
||||
public void setCombinedCurrent(Object combinedCurrent) {
|
||||
if (combinedCurrent!=null) {
|
||||
this.combinedCurrent = (String) combinedCurrent;
|
||||
}
|
||||
}
|
||||
|
||||
public String getInsulationResistance() {
|
||||
return insulationResistance;
|
||||
}
|
||||
|
||||
public void setInsulationResistance(Object insulationResistance) {
|
||||
if (insulationResistance!=null) {
|
||||
this.insulationResistance = (String) insulationResistance;
|
||||
}
|
||||
}
|
||||
|
||||
public String getGearPosition() {
|
||||
return gearPosition;
|
||||
}
|
||||
|
||||
public void setGearPosition(Object gearPosition) {
|
||||
if (gearPosition!=null) {
|
||||
this.gearPosition = (String) gearPosition;
|
||||
}
|
||||
}
|
||||
|
||||
public String getAcceleratorPedalTravelValue() {
|
||||
return acceleratorPedalTravelValue;
|
||||
}
|
||||
|
||||
public void setAcceleratorPedalTravelValue(Object acceleratorPedalTravelValue) {
|
||||
if (acceleratorPedalTravelValue!=null) {
|
||||
this.acceleratorPedalTravelValue = (String) acceleratorPedalTravelValue;
|
||||
}
|
||||
}
|
||||
|
||||
public String getBrakePedalTravelValue() {
|
||||
return brakePedalTravelValue;
|
||||
}
|
||||
|
||||
public void setBrakePedalTravelValue(Object brakePedalTravelValue) {
|
||||
if (brakePedalTravelValue!=null) {
|
||||
this.brakePedalTravelValue = (String) brakePedalTravelValue;
|
||||
}
|
||||
}
|
||||
|
||||
public String getSpecificFuelConsumption() {
|
||||
return specificFuelConsumption;
|
||||
}
|
||||
|
||||
public void setSpecificFuelConsumption(Object specificFuelConsumption) {
|
||||
if (specificFuelConsumption!=null) {
|
||||
this.specificFuelConsumption = (String) specificFuelConsumption;
|
||||
}
|
||||
}
|
||||
|
||||
public String getMotorControllerTemperature() {
|
||||
return motorControllerTemperature;
|
||||
}
|
||||
|
||||
public void setMotorControllerTemperature(Object motorControllerTemperature) {
|
||||
if (motorControllerTemperature!=null) {
|
||||
this.motorControllerTemperature = (String) motorControllerTemperature;
|
||||
}
|
||||
}
|
||||
|
||||
public String getMotorSpeed() {
|
||||
return motorSpeed;
|
||||
}
|
||||
|
||||
public void setMotorSpeed(Object motorSpeed) {
|
||||
if (motorSpeed!=null) {
|
||||
this.motorSpeed = (String) motorSpeed;
|
||||
}
|
||||
}
|
||||
|
||||
public String getMotorTorque() {
|
||||
return motorTorque;
|
||||
}
|
||||
|
||||
public void setMotorTorque(Object motorTorque) {
|
||||
if (motorTorque!=null) {
|
||||
this.motorTorque = (String) motorTorque;
|
||||
}
|
||||
}
|
||||
|
||||
public String getMotorTemperature() {
|
||||
return motorTemperature;
|
||||
}
|
||||
|
||||
public void setMotorTemperature(Object motorTemperature) {
|
||||
if (motorTemperature!=null) {
|
||||
this.motorTemperature = (String) motorTemperature;
|
||||
}
|
||||
}
|
||||
|
||||
public String getMotorVoltage() {
|
||||
return motorVoltage;
|
||||
}
|
||||
|
||||
public void setMotorVoltage(Object motorVoltage) {
|
||||
if (motorVoltage!=null) {
|
||||
this.motorVoltage = (String) motorVoltage;
|
||||
}
|
||||
}
|
||||
|
||||
public String getMotorCurrent() {
|
||||
return motorCurrent;
|
||||
}
|
||||
|
||||
public void setMotorCurrent(Object motorCurrent) {
|
||||
if (motorCurrent!=null) {
|
||||
this.motorCurrent = (String) motorCurrent;
|
||||
}
|
||||
}
|
||||
|
||||
public String getElectricalSoc() {
|
||||
return electricalSoc;
|
||||
}
|
||||
|
||||
public void setElectricalSoc(Object electricalSoc) {
|
||||
if (electricalSoc!=null) {
|
||||
this.electricalSoc = (String) electricalSoc;
|
||||
}
|
||||
}
|
||||
|
||||
public String getMaximumFeedbackPower() {
|
||||
return maximumFeedbackPower;
|
||||
}
|
||||
|
||||
public void setMaximumFeedbackPower(Object maximumFeedbackPower) {
|
||||
if (maximumFeedbackPower!=null) {
|
||||
this.maximumFeedbackPower = (String) maximumFeedbackPower;
|
||||
}
|
||||
}
|
||||
|
||||
public String getMaximumDischargePower() {
|
||||
return maximumDischargePower;
|
||||
}
|
||||
|
||||
public void setMaximumDischargePower(Object maximumDischargePower) {
|
||||
if (maximumDischargePower!=null) {
|
||||
this.maximumDischargePower = (String) maximumDischargePower;
|
||||
}
|
||||
}
|
||||
|
||||
public String getBms() {
|
||||
return bms;
|
||||
}
|
||||
|
||||
public void setBms(Object bms) {
|
||||
if (bms!=null) {
|
||||
this.bms = (String) bms;
|
||||
}
|
||||
}
|
||||
|
||||
public String getBatteryCurrent() {
|
||||
return batteryCurrent;
|
||||
}
|
||||
|
||||
public void setBatteryCurrent(Object batteryCurrent) {
|
||||
if (batteryCurrent!=null) {
|
||||
this.batteryCurrent = (String) batteryCurrent;
|
||||
}
|
||||
}
|
||||
|
||||
public String getV3() {
|
||||
return v3;
|
||||
}
|
||||
|
||||
public void setV3(Object v3) {
|
||||
if (v3!=null) {
|
||||
this.v3 = (String) v3;
|
||||
}
|
||||
}
|
||||
|
||||
public String getSingleMaximumVoltage() {
|
||||
return singleMaximumVoltage;
|
||||
}
|
||||
|
||||
public void setSingleMaximumVoltage(Object singleMaximumVoltage) {
|
||||
if (singleMaximumVoltage!=null) {
|
||||
this.singleMaximumVoltage = (String) singleMaximumVoltage;
|
||||
}
|
||||
}
|
||||
|
||||
public String getMinimumVoltageOfBattery() {
|
||||
return minimumVoltageOfBattery;
|
||||
}
|
||||
|
||||
public void setMinimumVoltageOfBattery(Object minimumVoltageOfBattery) {
|
||||
if (minimumVoltageOfBattery!=null) {
|
||||
this.minimumVoltageOfBattery = (String) minimumVoltageOfBattery;
|
||||
}
|
||||
}
|
||||
|
||||
public String getMaximumBatteryTemperature() {
|
||||
return maximumBatteryTemperature;
|
||||
}
|
||||
|
||||
public void setMaximumBatteryTemperature(Object maximumBatteryTemperature) {
|
||||
if (maximumBatteryTemperature!=null) {
|
||||
this.maximumBatteryTemperature = (String) maximumBatteryTemperature;
|
||||
}
|
||||
}
|
||||
|
||||
public String getMinimumBatteryTemperature() {
|
||||
return minimumBatteryTemperature;
|
||||
}
|
||||
|
||||
public void setMinimumBatteryTemperature(Object minimumBatteryTemperature) {
|
||||
if (minimumBatteryTemperature!=null) {
|
||||
this.minimumBatteryTemperature = (String) minimumBatteryTemperature;
|
||||
}
|
||||
}
|
||||
|
||||
public String getPowerBatteryAvailableCapacity() {
|
||||
return powerBatteryAvailableCapacity;
|
||||
}
|
||||
|
||||
public void setPowerBatteryAvailableCapacity(Object powerBatteryAvailableCapacity) {
|
||||
if (powerBatteryAvailableCapacity!=null) {
|
||||
this.powerBatteryAvailableCapacity = (String) powerBatteryAvailableCapacity;
|
||||
}
|
||||
}
|
||||
|
||||
public Integer getCarStatus() {
|
||||
return carStatus;
|
||||
}
|
||||
|
||||
public void setCarStatus(Object carStatus) {
|
||||
if (carStatus!=null) {
|
||||
this.carStatus = Integer.valueOf(carStatus.toString());
|
||||
}
|
||||
}
|
||||
|
||||
public Integer getChargingState() {
|
||||
return chargingState;
|
||||
}
|
||||
|
||||
public void setChargingState(Object chargingState) {
|
||||
if (chargingState!=null) {
|
||||
this.chargingState = Integer.valueOf(chargingState.toString());
|
||||
}
|
||||
}
|
||||
|
||||
public Integer getRunningState() {
|
||||
return runningState;
|
||||
}
|
||||
|
||||
public void setRunningState(Object runningState) {
|
||||
if (runningState!=null) {
|
||||
this.runningState = Integer.valueOf(runningState.toString());
|
||||
}
|
||||
}
|
||||
|
||||
public String getSoc() {
|
||||
return soc;
|
||||
}
|
||||
|
||||
public void setSoc(Object soc) {
|
||||
if (soc!=null) {
|
||||
this.soc = (String) soc;
|
||||
}
|
||||
}
|
||||
|
||||
public String getWorkingCondition() {
|
||||
return workingCondition;
|
||||
}
|
||||
|
||||
public void setWorkingCondition(Object workingCondition) {
|
||||
if (workingCondition!=null) {
|
||||
this.workingCondition = (String) workingCondition;
|
||||
}
|
||||
}
|
||||
|
||||
public String getDriveMotorCondition() {
|
||||
return driveMotorCondition;
|
||||
}
|
||||
|
||||
public void setDriveMotorCondition(Object driveMotorCondition) {
|
||||
if (driveMotorCondition!=null) {
|
||||
this.driveMotorCondition = (String) driveMotorCondition;
|
||||
}
|
||||
}
|
||||
|
||||
public String getWhetherTheLocationValid() {
|
||||
return whetherTheLocationValid;
|
||||
}
|
||||
|
||||
public void setWhetherTheLocationValid(Object whetherTheLocationValid) {
|
||||
if (whetherTheLocationValid!=null) {
|
||||
this.whetherTheLocationValid = (String) whetherTheLocationValid;
|
||||
}
|
||||
}
|
||||
|
||||
public String getEas() {
|
||||
return eas;
|
||||
}
|
||||
|
||||
public void setEas(Object eas) {
|
||||
if (eas!=null) {
|
||||
this.eas = (String) eas;
|
||||
}
|
||||
}
|
||||
|
||||
public String getPtc() {
|
||||
return ptc;
|
||||
}
|
||||
|
||||
public void setPtc(Object ptc) {
|
||||
if (ptc!=null) {
|
||||
this.ptc = (String) ptc;
|
||||
}
|
||||
}
|
||||
|
||||
public String getEps() {
|
||||
return eps;
|
||||
}
|
||||
|
||||
public void setEps(Object eps) {
|
||||
if (eps!=null) {
|
||||
this.eps = (String) eps;
|
||||
}
|
||||
}
|
||||
|
||||
public String getAbs() {
|
||||
return abs;
|
||||
}
|
||||
|
||||
public void setAbs(Object abs) {
|
||||
if (abs!=null) {
|
||||
this.abs = (String) abs;
|
||||
}
|
||||
}
|
||||
|
||||
public String getMcu() {
|
||||
return mcu;
|
||||
}
|
||||
|
||||
public void setMcu(Object mcu) {
|
||||
if (mcu!=null) {
|
||||
this.mcu = (String) mcu;
|
||||
}
|
||||
}
|
||||
|
||||
public String getPowerBatteryHeatingState() {
|
||||
return powerBatteryHeatingState;
|
||||
}
|
||||
|
||||
public void setPowerBatteryHeatingState(Object powerBatteryHeatingState) {
|
||||
if (powerBatteryHeatingState!=null) {
|
||||
this.powerBatteryHeatingState = (String) powerBatteryHeatingState;
|
||||
}
|
||||
}
|
||||
|
||||
public String getPowerBattery() {
|
||||
return powerBattery;
|
||||
}
|
||||
|
||||
public void setPowerBattery(Object powerBattery) {
|
||||
if (powerBattery!=null) {
|
||||
this.powerBattery = (String) powerBattery;
|
||||
}
|
||||
}
|
||||
|
||||
public String getPowerBatteryInsulationState() {
|
||||
return powerBatteryInsulationState;
|
||||
}
|
||||
|
||||
public void setPowerBatteryInsulationState(Object powerBatteryInsulationState) {
|
||||
if (powerBatteryInsulationState!=null) {
|
||||
this.powerBatteryInsulationState = (String) powerBatteryInsulationState;
|
||||
}
|
||||
}
|
||||
|
||||
public String getDcdc() {
|
||||
return dcdc;
|
||||
}
|
||||
|
||||
public void setDcdc(Object dcdc) {
|
||||
if (dcdc!=null) {
|
||||
this.dcdc = (String) dcdc;
|
||||
}
|
||||
}
|
||||
|
||||
public String getChg() {
|
||||
return chg;
|
||||
}
|
||||
|
||||
public void setChg(Object chg) {
|
||||
if (chg!=null) {
|
||||
this.chg = (String) chg;
|
||||
}
|
||||
}
|
||||
|
||||
public Long getTime() {
|
||||
return time;
|
||||
}
|
||||
|
||||
public void setTime(Object time) {
|
||||
if (time!=null) {
|
||||
this.time = Long.valueOf(time.toString());
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,167 @@
|
|||
package com.ruoyi.analysis.handler;
|
||||
|
||||
import com.ruoyi.analysis.domain.VehicleMessage;
|
||||
import lombok.extern.log4j.Log4j2;
|
||||
|
||||
/**
|
||||
* 报文格式分割
|
||||
*/
|
||||
@Log4j2
|
||||
public class MessageHandler {
|
||||
|
||||
/**
|
||||
*根据转译对数据进行封装实体类
|
||||
*/
|
||||
public static VehicleMessage messageTranslated(String content){
|
||||
//创建对象
|
||||
VehicleMessage vehicleMessage=new VehicleMessage();
|
||||
|
||||
//消息标识
|
||||
vehicleMessage.setIdentification(content.substring(0, 12));
|
||||
|
||||
//VIN码
|
||||
vehicleMessage.setVinCode(content.substring(12,29));
|
||||
|
||||
//经度
|
||||
vehicleMessage.setLongitude(content.substring(29,40));
|
||||
|
||||
//纬度
|
||||
vehicleMessage.setLatitude(content.substring(40,50));
|
||||
|
||||
//车速
|
||||
vehicleMessage.setSpeedOfVehicle(content.substring(50,56));
|
||||
|
||||
//总里程
|
||||
vehicleMessage.setTotalMileage(content.substring(56,67));
|
||||
|
||||
//总电压
|
||||
vehicleMessage.setTotalVoltage(content.substring(67,73));
|
||||
|
||||
//总电流
|
||||
vehicleMessage.setCombinedCurrent(content.substring(73,78));
|
||||
|
||||
//绝缘电阻
|
||||
vehicleMessage.setInsulationResistance(content.substring(78,87));
|
||||
|
||||
//档位
|
||||
vehicleMessage.setGearPosition(content.substring(87,88));
|
||||
|
||||
//加速踏板行程值
|
||||
vehicleMessage.setAcceleratorPedalTravelValue(content.substring(88,90));
|
||||
|
||||
//制动踏板行程值
|
||||
vehicleMessage.setBrakePedalTravelValue(content.substring(90,92));
|
||||
|
||||
//燃料消耗率
|
||||
vehicleMessage.setSpecificFuelConsumption(content.substring(92,97));
|
||||
|
||||
//电机控制器温度
|
||||
vehicleMessage.setMotorControllerTemperature(content.substring(97,103));
|
||||
|
||||
//电机转速
|
||||
vehicleMessage.setMotorSpeed(content.substring(103,108));
|
||||
|
||||
//电机转矩
|
||||
vehicleMessage.setMotorTorque(content.substring(108,112));
|
||||
|
||||
//电机温度
|
||||
vehicleMessage.setMotorTemperature(content.substring(112,118));
|
||||
|
||||
//电机电压
|
||||
vehicleMessage.setMotorVoltage(content.substring(118,123));
|
||||
|
||||
//电机电流
|
||||
vehicleMessage.setMotorCurrent(content.substring(123,131));
|
||||
|
||||
//动力电池剩余电量SOC
|
||||
vehicleMessage.setElectricalSoc(content.substring(131,137));
|
||||
|
||||
//当前状态允许的最大反馈功率
|
||||
vehicleMessage.setMaximumFeedbackPower(content.substring(137,143));
|
||||
|
||||
//当前状态允许最大放电功率
|
||||
vehicleMessage.setMaximumDischargePower(content.substring(143,149));
|
||||
|
||||
//BMS自检计数器
|
||||
vehicleMessage.setBms(content.substring(149,151));
|
||||
|
||||
//动力电池充放电电流
|
||||
vehicleMessage.setBatteryCurrent(content.substring(151,156));
|
||||
|
||||
//动力电池负载端总电压V3
|
||||
vehicleMessage.setV3(content.substring(156,162));
|
||||
|
||||
//单次最大电压
|
||||
vehicleMessage.setSingleMaximumVoltage(content.substring(162,165));
|
||||
|
||||
//单体电池最低电压
|
||||
vehicleMessage.setMinimumVoltageOfBattery(content.substring(165,170));
|
||||
|
||||
//单体电池最高温度
|
||||
vehicleMessage.setMaximumBatteryTemperature(content.substring(170,176));
|
||||
|
||||
//单体电池最低温度
|
||||
vehicleMessage.setMinimumBatteryTemperature(content.substring(176,182));
|
||||
|
||||
//动力电池可用容量
|
||||
vehicleMessage.setPowerBatteryAvailableCapacity(content.substring(182,188));
|
||||
|
||||
//车辆状态
|
||||
vehicleMessage.setCarStatus(Integer.valueOf(content.substring(188,189)));
|
||||
|
||||
//充电状态
|
||||
vehicleMessage.setChargingState(Integer.valueOf(content.substring(189,190)));
|
||||
|
||||
//运行状态
|
||||
vehicleMessage.setRunningState(Integer.valueOf(content.substring(190,191)));
|
||||
|
||||
//SOC
|
||||
vehicleMessage.setSoc(content.substring(191,192));
|
||||
|
||||
//可充电储能装置工作状态
|
||||
vehicleMessage.setWorkingCondition(content.substring(192,193));
|
||||
|
||||
//驱动电机状态
|
||||
vehicleMessage.setDriveMotorCondition(content.substring(193,194));
|
||||
|
||||
//定位是否有效
|
||||
vehicleMessage.setWhetherTheLocationValid(content.substring(194,195));
|
||||
|
||||
//EAS
|
||||
vehicleMessage.setEas(content.substring(195,196));
|
||||
|
||||
//PTC
|
||||
vehicleMessage.setPtc(content.substring(196,197));
|
||||
|
||||
//EPS
|
||||
vehicleMessage.setEps(content.substring(197,198));
|
||||
|
||||
//ABS
|
||||
vehicleMessage.setAbs(content.substring(198,199));
|
||||
|
||||
//MCU
|
||||
vehicleMessage.setMcu(content.substring(199,200));
|
||||
|
||||
//动力电池加热状态
|
||||
vehicleMessage.setPowerBatteryHeatingState(content.substring(200,201));
|
||||
|
||||
//动力电池当前状态
|
||||
vehicleMessage.setPowerBattery(content.substring(201,202));
|
||||
|
||||
//动力电池保温状态
|
||||
vehicleMessage.setPowerBatteryInsulationState(content.substring(202,203));
|
||||
|
||||
//DCDC
|
||||
vehicleMessage.setDcdc(content.substring(203,204));
|
||||
|
||||
//CHG
|
||||
vehicleMessage.setChg(content.substring(204,205));
|
||||
|
||||
//时间
|
||||
vehicleMessage.setTime(Long.valueOf(content.substring(205,218)));
|
||||
|
||||
return vehicleMessage;
|
||||
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,75 @@
|
|||
package com.ruoyi.analysis.handler;
|
||||
|
||||
import com.ruoyi.analysis.constants.VehicleConstant;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
|
||||
/**
|
||||
* 报文转换
|
||||
*/
|
||||
public class MessageVerify {
|
||||
|
||||
|
||||
/**
|
||||
* 检测报文是否合法
|
||||
*/
|
||||
public static boolean checkMessageHandler(String message){
|
||||
if(!message.contains(VehicleConstant.DATA_PACK_SEPARATOR)){
|
||||
return false;
|
||||
}
|
||||
String temp = message.replaceAll("#\\$&\\*","");
|
||||
if(message.indexOf(VehicleConstant.MSG_START) !=0) return false;
|
||||
if((message.lastIndexOf(VehicleConstant.MSG_END)+2) != temp.length()) return false;
|
||||
String content = message.substring(2,message.length()-9);
|
||||
String data = content.replace(" ", "");
|
||||
int dSum = 0;
|
||||
int length = data.length();
|
||||
int index = 0;
|
||||
// 遍历十六进制,并计算总和
|
||||
while (index < length) {
|
||||
// 截取2位字符
|
||||
String s = data.substring(index, index + 2);
|
||||
// 十六进制转成十进制 , 并计算十进制的总和
|
||||
dSum += Integer.parseInt(s, 16);
|
||||
index = index + 2;
|
||||
}
|
||||
// 用256取余,十六进制最大是FF,FF的十进制是255
|
||||
int mod = dSum % 256;
|
||||
// 余数转成十六进制
|
||||
String checkSumHex = Integer.toHexString(mod);
|
||||
length = checkSumHex.length();
|
||||
if (length < 2) {
|
||||
// 校验位不足两位的,在前面补0
|
||||
checkSumHex = "0" + checkSumHex;
|
||||
}
|
||||
String parityBit = message.substring(message.length() - 9,message.length() - 7);
|
||||
return parityBit.equals(checkSumHex);
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* 将16进制数据转为字符
|
||||
*/
|
||||
public static String theDecimal(String hexString){
|
||||
String[] split = hexString.split(" ");
|
||||
StringBuilder stringBuilder = new StringBuilder();
|
||||
for (String info : split) {
|
||||
if(StringUtils.isNotEmpty(info)){
|
||||
String decimalTemp = valueToAscii(Integer.valueOf(info, 16).toString());
|
||||
stringBuilder.append(decimalTemp);
|
||||
}
|
||||
}
|
||||
return stringBuilder.toString();
|
||||
}
|
||||
|
||||
/**
|
||||
* 将10进制数据转为字符
|
||||
*/
|
||||
public static String valueToAscii(String value){
|
||||
StringBuilder builder = new StringBuilder();
|
||||
String[] chars = value.split(",");
|
||||
for (String info : chars) {
|
||||
builder.append((char)Integer.parseInt(info));
|
||||
}
|
||||
return builder.toString();
|
||||
}
|
||||
}
|
|
@ -0,0 +1,37 @@
|
|||
package com.ruoyi.analysis.hbase.config;
|
||||
|
||||
|
||||
import com.ruoyi.analysis.hbase.exception.HBaseException;
|
||||
import org.springframework.boot.context.properties.ConfigurationProperties;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* Hbase配置类
|
||||
*/
|
||||
|
||||
@Configuration
|
||||
@ConfigurationProperties(prefix = HbaseConfig.CONF_PREFIX)
|
||||
public class HbaseConfig {
|
||||
/**
|
||||
* 配置文件前缀
|
||||
*/
|
||||
public static final String CONF_PREFIX = "hbase.conf";
|
||||
|
||||
private Map<String,String> confMaps;
|
||||
|
||||
public Map<String, String> getConfMaps() {
|
||||
if(confMaps == null){
|
||||
throw new HBaseException("请配置:[hbase.conf.confMaps.zookeeper.quorum]");
|
||||
}
|
||||
return confMaps;
|
||||
}
|
||||
|
||||
/**
|
||||
* 将配置映射给当前对象
|
||||
*/
|
||||
public void setConfMaps(Map<String, String> confMaps) {
|
||||
this.confMaps = confMaps;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,14 @@
|
|||
package com.ruoyi.analysis.hbase.exception;
|
||||
|
||||
/**
|
||||
* 封装Hbase的异常类
|
||||
*/
|
||||
public class HBaseException extends RuntimeException{
|
||||
public HBaseException(String message) {
|
||||
super(message);
|
||||
}
|
||||
|
||||
public HBaseException(String message, Throwable cause) {
|
||||
super(message, cause);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,58 @@
|
|||
package com.ruoyi.analysis.hbase.instance;
|
||||
|
||||
|
||||
import com.ruoyi.analysis.hbase.config.HbaseConfig;
|
||||
import lombok.extern.log4j.Log4j2;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.HBaseConfiguration;
|
||||
import org.apache.hadoop.hbase.client.Admin;
|
||||
import org.apache.hadoop.hbase.client.Connection;
|
||||
import org.apache.hadoop.hbase.client.ConnectionFactory;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
|
||||
/**
|
||||
* HBase连接和管理实例
|
||||
*/
|
||||
@Log4j2
|
||||
@Component
|
||||
public class HBaseInstance {
|
||||
private ExecutorService pool = Executors.newScheduledThreadPool(20); //设置hbase连接池
|
||||
|
||||
/**
|
||||
* 初始化连接
|
||||
*/
|
||||
@Bean(name = "hbaseConnection")
|
||||
public Connection initConnection(HbaseConfig hbaseConfig){
|
||||
try {
|
||||
Configuration conf = HBaseConfiguration.create();
|
||||
//将hbase配置类中定义的配置加载到连接池中每个连接里
|
||||
Map<String, String> confMap = hbaseConfig.getConfMaps();
|
||||
for (Map.Entry<String,String> confEntry : confMap.entrySet()) {
|
||||
conf.set(confEntry.getKey(), confEntry.getValue());
|
||||
}
|
||||
return ConnectionFactory.createConnection(conf, pool);
|
||||
} catch (IOException e) {
|
||||
log.error("连接HBase出错",e);
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Hbase管理APi
|
||||
*/
|
||||
@Bean(name = "hbaseAdmin")
|
||||
public Admin initAdmin(Connection connection){
|
||||
try {
|
||||
return connection.getAdmin();
|
||||
} catch (IOException e) {
|
||||
log.error("创建HBase管理API出错",e);
|
||||
return null;
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,142 @@
|
|||
package com.ruoyi.analysis.hbase.service;
|
||||
|
||||
|
||||
import com.ruoyi.analysis.domain.VehicleMessage;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.lang.reflect.InvocationTargetException;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* Hbase接口
|
||||
*/
|
||||
|
||||
public interface HBaseService {
|
||||
/**
|
||||
* 创建表
|
||||
*
|
||||
* @param tableName 表名
|
||||
* @param columnFamily 列族(数组)
|
||||
*/
|
||||
public void createTable(String tableName, String[] columnFamily) throws IOException;
|
||||
/**
|
||||
* 插入记录(单行单列族-多列多值)
|
||||
*
|
||||
* @param tableName 表名
|
||||
* @param row 行名
|
||||
* @param columnFamilys 列族名
|
||||
* @param columns 列名(数组)
|
||||
* @param values 值(数组)(且需要和列一一对应)
|
||||
*/
|
||||
public void insertRecords(String tableName, String row, String columnFamilys, String[] columns, String[] values) throws IOException;
|
||||
|
||||
/**
|
||||
* 插入记录(单行单列族-多列多值)
|
||||
*
|
||||
* @param tableName 表名
|
||||
* @param row 行名
|
||||
* @param columnFamilys 列族名
|
||||
* @param vehicleMessage 对象
|
||||
*/
|
||||
public void insertRecordsForObj(String tableName, String row, String columnFamilys, VehicleMessage vehicleMessage) throws IOException;
|
||||
|
||||
/**
|
||||
* 插入记录(单行单列族-单列单值)
|
||||
*
|
||||
* @param tableName 表名
|
||||
* @param row 行名
|
||||
* @param columnFamily 列族名
|
||||
* @param column 列名
|
||||
* @param value 值
|
||||
*/
|
||||
public void insertOneRecord(String tableName, String row, String columnFamily, String column, String value) throws IOException;
|
||||
|
||||
/**
|
||||
* 删除一行记录
|
||||
*
|
||||
* @param tablename 表名
|
||||
* @param rowkey 行名
|
||||
*/
|
||||
public void deleteRow(String tablename, String rowkey) throws IOException;
|
||||
|
||||
/**
|
||||
* 删除单行单列族记录
|
||||
* @param tablename 表名
|
||||
* @param rowkey 行名
|
||||
* @param columnFamily 列族名
|
||||
*/
|
||||
public void deleteColumnFamily(String tablename, String rowkey, String columnFamily) throws IOException;
|
||||
|
||||
/**
|
||||
* 删除单行单列族单列记录
|
||||
*
|
||||
* @param tablename 表名
|
||||
* @param rowkey 行名
|
||||
* @param columnFamily 列族名
|
||||
* @param column 列名
|
||||
*/
|
||||
public void deleteColumn(String tablename, String rowkey, String columnFamily, String column) throws IOException;
|
||||
|
||||
/**
|
||||
* 查找一行记录
|
||||
*
|
||||
* @param tableName 表名
|
||||
* @param rowKey 行名
|
||||
*/
|
||||
public String selectRow(String tableName, String rowKey) throws IOException;
|
||||
|
||||
/**
|
||||
* 查找单行单列族单列记录
|
||||
*
|
||||
* @param tablename 表名
|
||||
* @param rowKey 行名
|
||||
* @param columnFamily 列族名
|
||||
* @param column 列名
|
||||
* @return
|
||||
*/
|
||||
public String selectValue(String tablename, String rowKey, String columnFamily, String column) throws IOException;
|
||||
|
||||
/**
|
||||
* 根据rowkey关键字查询报告记录
|
||||
*
|
||||
* @param tablename
|
||||
* @param rowKeyword
|
||||
* @return
|
||||
*/
|
||||
public List scanReportDataByRowKeyword(String tablename, String rowKeyword) throws IOException;
|
||||
|
||||
/**
|
||||
* 删除表操作
|
||||
*
|
||||
* @param tablename
|
||||
*/
|
||||
public void deleteTable(String tablename) throws IOException;
|
||||
|
||||
/**
|
||||
* 范围查询
|
||||
* @return
|
||||
* @throws IOException
|
||||
*/
|
||||
public List<VehicleMessage> scanRange(String tablename, String start, String end) throws IOException, NoSuchFieldException, InvocationTargetException, IllegalAccessException;
|
||||
|
||||
/**
|
||||
* 根据起始范围查询
|
||||
* @return
|
||||
* @throws IOException
|
||||
*/
|
||||
public List<VehicleMessage> scanRangeByStart(String tablename, String start) throws IOException, NoSuchFieldException, InvocationTargetException, IllegalAccessException;
|
||||
|
||||
/**
|
||||
* 查询全部
|
||||
*/
|
||||
public void selectAll(String tablename) throws IOException;
|
||||
|
||||
/**
|
||||
* 判断表名是否存在
|
||||
* @param tableName
|
||||
* @return
|
||||
* @throws IOException
|
||||
*/
|
||||
public boolean isTableExists(String tableName) throws IOException;
|
||||
|
||||
}
|
|
@ -0,0 +1,277 @@
|
|||
package com.ruoyi.analysis.hbase.service.impl;
|
||||
|
||||
import com.ruoyi.analysis.domain.VehicleMessage;
|
||||
import com.ruoyi.analysis.hbase.service.HBaseService;
|
||||
import com.ruoyi.analysis.util.BeanUtils;
|
||||
import lombok.extern.log4j.Log4j2;
|
||||
import org.apache.hadoop.hbase.Cell;
|
||||
import org.apache.hadoop.hbase.HColumnDescriptor;
|
||||
import org.apache.hadoop.hbase.HTableDescriptor;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.client.*;
|
||||
import org.apache.hadoop.hbase.filter.CompareFilter;
|
||||
import org.apache.hadoop.hbase.filter.RowFilter;
|
||||
import org.apache.hadoop.hbase.filter.SubstringComparator;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.lang.reflect.Field;
|
||||
import java.lang.reflect.InvocationTargetException;
|
||||
import java.lang.reflect.Method;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.NavigableMap;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
/**
|
||||
* HBaseService接口的实现类
|
||||
*/
|
||||
|
||||
@Log4j2
|
||||
@Service
|
||||
public class HBaseServiceImpl implements HBaseService {
|
||||
@Autowired
|
||||
private Admin admin;
|
||||
|
||||
@Autowired
|
||||
private Connection connection;
|
||||
|
||||
@Override
|
||||
public void createTable(String tableName, String[] columnFamily) throws IOException {
|
||||
TableName name = TableName.valueOf(tableName);
|
||||
//如果存在则删除
|
||||
if (admin.tableExists(name)) {
|
||||
admin.disableTable(name);
|
||||
admin.deleteTable(name);
|
||||
log.error("该数据表已经被创建,表名:{}", name);
|
||||
} else {
|
||||
HTableDescriptor desc = new HTableDescriptor(name);
|
||||
for (String cf : columnFamily) {
|
||||
desc.addFamily(new HColumnDescriptor(cf));
|
||||
}
|
||||
admin.createTable(desc);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void insertRecords(String tableName, String row, String columnFamilys, String[] columns, String[] values) throws IOException {
|
||||
TableName name = TableName.valueOf(tableName);
|
||||
Table table = connection.getTable(name);
|
||||
Put put = new Put(Bytes.toBytes(row));
|
||||
for (int i = 0; i < columns.length; i++) {
|
||||
put.addColumn(Bytes.toBytes(columnFamilys), Bytes.toBytes(columns[i]), Bytes.toBytes(values[i]));
|
||||
table.put(put);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public void insertRecordsForObj(String tableName, String row, String columnFamilys, VehicleMessage vehicleMessage) throws IOException {
|
||||
TableName name = TableName.valueOf(tableName);
|
||||
Table table = connection.getTable(name);
|
||||
Put put = new Put(Bytes.toBytes(row));
|
||||
Class<? extends VehicleMessage> vehicleMessageClass = vehicleMessage.getClass();
|
||||
Field[] fields = vehicleMessageClass.getDeclaredFields();
|
||||
List<String> columns = Arrays.stream(fields).map(Field::getName).collect(Collectors.toList());
|
||||
List<String> values = Arrays.stream(fields).map(temp -> {
|
||||
return String.valueOf(BeanUtils.getGetterMethodValue(vehicleMessageClass, temp.getName(), vehicleMessage));
|
||||
}).collect(Collectors.toList());
|
||||
for (int i = 0; i < columns.size(); i++) {
|
||||
put.addColumn(Bytes.toBytes(columnFamilys), Bytes.toBytes(columns.get(i)), Bytes.toBytes(values.get(i)));
|
||||
table.put(put);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void insertOneRecord(String tableName, String row, String columnFamily, String column, String value) throws IOException {
|
||||
TableName name = TableName.valueOf(tableName);
|
||||
Table table = connection.getTable(name);
|
||||
Put put = new Put(Bytes.toBytes(row));
|
||||
put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes(column), Bytes.toBytes(value));
|
||||
table.put(put);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void deleteRow(String tablename, String rowkey) throws IOException {
|
||||
TableName name = TableName.valueOf(tablename);
|
||||
Table table = connection.getTable(name);
|
||||
Delete d = new Delete(rowkey.getBytes());
|
||||
table.delete(d);
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public void deleteColumnFamily(String tablename, String rowkey, String columnFamily) throws IOException {
|
||||
TableName name = TableName.valueOf(tablename);
|
||||
Table table = connection.getTable(name);
|
||||
Delete d = new Delete(rowkey.getBytes()).addFamily(Bytes.toBytes(columnFamily));
|
||||
table.delete(d);
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public void deleteColumn(String tablename, String rowkey, String columnFamily, String column) throws IOException {
|
||||
TableName name = TableName.valueOf(tablename);
|
||||
Table table = connection.getTable(name);
|
||||
Delete d = new Delete(rowkey.getBytes()).addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes(column));
|
||||
table.delete(d);
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public String selectRow(String tableName, String rowKey) throws IOException {
|
||||
String record = "";
|
||||
TableName name=TableName.valueOf(tableName);
|
||||
Table table = connection.getTable(name);
|
||||
Get g = new Get(rowKey.getBytes());
|
||||
Result rs = table.get(g);
|
||||
NavigableMap<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>> map = rs.getMap();
|
||||
for (Cell cell : rs.rawCells()) {
|
||||
StringBuffer stringBuffer = new StringBuffer()
|
||||
.append(Bytes.toString(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength())).append("\t")
|
||||
.append(Bytes.toString(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength())).append("\t")
|
||||
.append(Bytes.toString(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength())).append("\t")
|
||||
.append(Bytes.toString(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength())).append("\n");
|
||||
String str = stringBuffer.toString();
|
||||
record += str;
|
||||
}
|
||||
return record;
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public String selectValue(String tablename, String rowKey, String columnFamily, String column) throws IOException {
|
||||
TableName name=TableName.valueOf(tablename);
|
||||
Table table = connection.getTable(name);
|
||||
Get g = new Get(rowKey.getBytes());
|
||||
g.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes(column));
|
||||
Result rs = table.get(g);
|
||||
return Bytes.toString(rs.value());
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public List scanReportDataByRowKeyword(String tablename, String rowKeyword) throws IOException {
|
||||
ArrayList<Object> list = new ArrayList<Object>();
|
||||
|
||||
Table table = connection.getTable(TableName.valueOf(tablename));
|
||||
Scan scan = new Scan();
|
||||
|
||||
//添加行键过滤器,根据关键字匹配
|
||||
RowFilter rowFilter = new RowFilter(CompareFilter.CompareOp.EQUAL, new SubstringComparator(rowKeyword));
|
||||
scan.setFilter(rowFilter);
|
||||
|
||||
ResultScanner scanner = table.getScanner(scan);
|
||||
try {
|
||||
for (Result result : scanner) {
|
||||
//TODO 此处根据业务来自定义实现
|
||||
list.add(null);
|
||||
}
|
||||
} finally {
|
||||
if (scanner != null) {
|
||||
scanner.close();
|
||||
}
|
||||
}
|
||||
return list;
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public void deleteTable(String tablename) throws IOException {
|
||||
TableName name=TableName.valueOf(tablename);
|
||||
if(admin.tableExists(name)) {
|
||||
admin.disableTable(name);
|
||||
admin.deleteTable(name);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<VehicleMessage> scanRange(String tablename, String start, String end) throws IOException, NoSuchFieldException, InvocationTargetException, IllegalAccessException {
|
||||
Table table = connection.getTable(TableName.valueOf(tablename));
|
||||
Scan scan = new Scan();
|
||||
//设置起始和结束的rowkey,范围值扫描包前不包后:只能扫到0005不能扫到0006.没有设置范围将会全局扫描
|
||||
scan.setStartRow(start.getBytes());
|
||||
scan.setStopRow(end.getBytes());
|
||||
//返回多条数据结果值都封装在resultScanner里面了
|
||||
ResultScanner scanner = table.getScanner(scan);
|
||||
ArrayList<VehicleMessage> messages = new ArrayList<>();
|
||||
Class<VehicleMessage> aClass = VehicleMessage.class;
|
||||
for (Result result : scanner){
|
||||
VehicleMessage vehicleMessage = new VehicleMessage();
|
||||
List<Cell> cells = result.listCells();
|
||||
for (Cell cell : cells){
|
||||
// String rowkey = Bytes.toString(cell.getRowArray(),cell.getRowOffset(),cell.getRowLength());
|
||||
//获取列族名
|
||||
// String familyName = Bytes.toString(cell.getFamilyArray(),cell.getFamilyOffset(),cell.getFamilyLength());
|
||||
//获取列名
|
||||
String columnName = Bytes.toString(cell.getQualifierArray(),cell.getQualifierOffset(),cell.getQualifierLength());
|
||||
String value = Bytes.toString(cell.getValueArray(),cell.getValueOffset(),cell.getValueLength());
|
||||
Method setterMethod = BeanUtils.getSetterMethod(aClass,Object.class, columnName);
|
||||
setterMethod.invoke(vehicleMessage, value);
|
||||
}
|
||||
messages.add(vehicleMessage);
|
||||
}
|
||||
return messages;
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<VehicleMessage> scanRangeByStart(String tablename, String start) throws IOException, NoSuchFieldException, InvocationTargetException, IllegalAccessException {
|
||||
Table table = connection.getTable(TableName.valueOf(tablename));
|
||||
Scan scan = new Scan();
|
||||
//设置起始和结束的rowkey,范围值扫描包前不包后:只能扫到0005不能扫到0006.没有设置范围将会全局扫描
|
||||
scan.setStartRow(start.getBytes());
|
||||
//返回多条数据结果值都封装在resultScanner里面了
|
||||
ResultScanner scanner = table.getScanner(scan);
|
||||
ArrayList<VehicleMessage> messages = new ArrayList<>();
|
||||
Class<VehicleMessage> aClass = VehicleMessage.class;
|
||||
for (Result result : scanner){
|
||||
VehicleMessage vehicleMessage = new VehicleMessage();
|
||||
List<Cell> cells = result.listCells();
|
||||
for (Cell cell : cells){
|
||||
String rowkey = Bytes.toString(cell.getRowArray(),cell.getRowOffset(),cell.getRowLength());
|
||||
//获取列族名
|
||||
// String familyName = Bytes.toString(cell.getFamilyArray(),cell.getFamilyOffset(),cell.getFamilyLength());
|
||||
//获取列名
|
||||
String columnName = Bytes.toString(cell.getQualifierArray(),cell.getQualifierOffset(),cell.getQualifierLength());
|
||||
String value = Bytes.toString(cell.getValueArray(),cell.getValueOffset(),cell.getValueLength());
|
||||
Method setterMethod = BeanUtils.getSetterMethod(aClass,Object.class, columnName);
|
||||
// log.info("rowKey:"+rowkey + " column:"+columnName + " value:"+value);
|
||||
setterMethod.invoke(vehicleMessage, value);
|
||||
}
|
||||
messages.add(vehicleMessage);
|
||||
}
|
||||
return messages;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void selectAll(String tablename) throws IOException {
|
||||
Table table = connection.getTable(TableName.valueOf(tablename));
|
||||
Scan scan = new Scan();
|
||||
ResultScanner scanner = table.getScanner(scan);
|
||||
Class<VehicleMessage> aClass = VehicleMessage.class;
|
||||
for (Result result : scanner){
|
||||
List<Cell> cells = result.listCells();
|
||||
for (Cell cell : cells){
|
||||
String rowkey = Bytes.toString(cell.getRowArray(),cell.getRowOffset(),cell.getRowLength());
|
||||
//获取列族名
|
||||
String familyName = Bytes.toString(cell.getFamilyArray(),cell.getFamilyOffset(),cell.getFamilyLength());
|
||||
//获取列名
|
||||
String columnName = Bytes.toString(cell.getQualifierArray(),cell.getQualifierOffset(),cell.getQualifierLength());
|
||||
String value = Bytes.toString(cell.getValueArray(),cell.getValueOffset(),cell.getValueLength());
|
||||
log.info("rowKey:"+rowkey + " column:"+columnName + " value:"+value);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isTableExists(String tableName) throws IOException {
|
||||
Admin admin = connection.getAdmin();
|
||||
//调用API进行判断表是否存在
|
||||
return admin.tableExists(TableName.valueOf(tableName));
|
||||
}
|
||||
|
||||
|
||||
}
|
|
@ -0,0 +1,162 @@
|
|||
package com.ruoyi.analysis.util;
|
||||
|
||||
import com.ruoyi.common.core.exception.ServiceException;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.lang.reflect.InvocationTargetException;
|
||||
import java.lang.reflect.Method;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.regex.Matcher;
|
||||
import java.util.regex.Pattern;
|
||||
|
||||
public class BeanUtils extends org.springframework.beans.BeanUtils {
|
||||
private static final Logger log = LoggerFactory.getLogger(BeanUtils.class);
|
||||
private static final int BEAN_METHOD_PROP_INDEX = 3;
|
||||
private static final Pattern GET_PATTERN = Pattern.compile("get(\\p{javaUpperCase}\\w*)");
|
||||
private static final Pattern SET_PATTERN = Pattern.compile("set(\\p{javaUpperCase}\\w*)");
|
||||
public static final String SET_PRE = "set";
|
||||
public static final String GET_PRE = "get";
|
||||
private static Pattern humpPattern = Pattern.compile("[A-Z]");
|
||||
private static Pattern linePattern = Pattern.compile("_(\\w)");
|
||||
|
||||
public BeanUtils() {
|
||||
}
|
||||
|
||||
public static String lineToHump(String str) {
|
||||
str = str.toLowerCase();
|
||||
Matcher matcher = linePattern.matcher(str);
|
||||
StringBuffer sb = new StringBuffer();
|
||||
|
||||
while (matcher.find()) {
|
||||
matcher.appendReplacement(sb, matcher.group(1).toUpperCase());
|
||||
}
|
||||
|
||||
matcher.appendTail(sb);
|
||||
return sb.toString();
|
||||
}
|
||||
|
||||
public static String humpToLine(String str) {
|
||||
Matcher matcher = humpPattern.matcher(str);
|
||||
StringBuffer sb = new StringBuffer();
|
||||
|
||||
while (matcher.find()) {
|
||||
matcher.appendReplacement(sb, "_" + matcher.group(0).toLowerCase());
|
||||
}
|
||||
|
||||
matcher.appendTail(sb);
|
||||
return sb.toString();
|
||||
}
|
||||
|
||||
public static String captureName(String name) {
|
||||
name = name.substring(0, 1).toLowerCase() + name.substring(1);
|
||||
return name;
|
||||
}
|
||||
|
||||
public static String upperName(String name) {
|
||||
return name.substring(0, 1).toUpperCase() + name.substring(1);
|
||||
}
|
||||
|
||||
public static String getMethodName(String name) {
|
||||
return "get" + upperName(name);
|
||||
}
|
||||
|
||||
public static String setMethodName(String name) {
|
||||
return "set" + upperName(name);
|
||||
}
|
||||
|
||||
public static void copyBeanProp(Object dest, Object src) {
|
||||
try {
|
||||
copyProperties(src, dest);
|
||||
} catch (Exception var3) {
|
||||
var3.printStackTrace();
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
public static Method getGetterMethod(Class<?> clazz, String name) {
|
||||
String methodName = getMethodName(name);
|
||||
|
||||
try {
|
||||
return clazz.getMethod(methodName);
|
||||
} catch (NoSuchMethodException var4) {
|
||||
log.error("获取方法执行对象是失败,类:【{}】, 方法名:【{}】", new Object[]{clazz.getName(), methodName, var4});
|
||||
throw new RuntimeException(var4);
|
||||
}
|
||||
}
|
||||
|
||||
public static <T> T valueToT(Object object, Class<T> clazz) {
|
||||
try {
|
||||
return !StringUtils.isEmpty((CharSequence)object) ? (T) object : null;
|
||||
} catch (Exception var3) {
|
||||
throw new ServiceException("程序类型转换异常Object -> " + clazz.getName());
|
||||
}
|
||||
}
|
||||
|
||||
public static Object getGetterMethodValue(Class<?> clazz, String name, Object obj) {
|
||||
Method method = getGetterMethod(clazz, name);
|
||||
|
||||
try {
|
||||
Object value = method.invoke(obj);
|
||||
return value != null && value.toString() != "" ? value : null;
|
||||
} catch (IllegalAccessException var5) {
|
||||
throw new RuntimeException(var5);
|
||||
} catch (InvocationTargetException var6) {
|
||||
throw new RuntimeException(var6);
|
||||
}
|
||||
}
|
||||
|
||||
public static List<Method> getSetterMethods(Object obj) {
|
||||
List<Method> setterMethods = new ArrayList();
|
||||
Method[] methods = obj.getClass().getMethods();
|
||||
Method[] var3 = methods;
|
||||
int var4 = methods.length;
|
||||
|
||||
for(int var5 = 0; var5 < var4; ++var5) {
|
||||
Method method = var3[var5];
|
||||
Matcher m = SET_PATTERN.matcher(method.getName());
|
||||
if (m.matches() && method.getParameterTypes().length == 1) {
|
||||
setterMethods.add(method);
|
||||
}
|
||||
}
|
||||
|
||||
return setterMethods;
|
||||
}
|
||||
|
||||
public static List<Method> getGetterMethods(Object obj) {
|
||||
List<Method> getterMethods = new ArrayList();
|
||||
Method[] methods = obj.getClass().getMethods();
|
||||
Method[] var3 = methods;
|
||||
int var4 = methods.length;
|
||||
|
||||
for(int var5 = 0; var5 < var4; ++var5) {
|
||||
Method method = var3[var5];
|
||||
Matcher m = GET_PATTERN.matcher(method.getName());
|
||||
if (m.matches() && method.getParameterTypes().length == 0) {
|
||||
getterMethods.add(method);
|
||||
}
|
||||
}
|
||||
|
||||
return getterMethods;
|
||||
}
|
||||
|
||||
public static boolean isMethodPropEquals(String m1, String m2) {
|
||||
return m1.substring(3).equals(m2.substring(3));
|
||||
}
|
||||
|
||||
public static Method getSetterMethod(Class<?> cls, Class<?> filedType, String name) {
|
||||
String methodName = setMethodName(name);
|
||||
|
||||
try {
|
||||
Method method = cls.getMethod(methodName, filedType);
|
||||
return method;
|
||||
} catch (NoSuchMethodException var5) {
|
||||
log.error("获取方法执行对象失败:类[{}],方法名[{}]", cls.getName(), methodName);
|
||||
throw new RuntimeException(var5);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
}
|
|
@ -0,0 +1,132 @@
|
|||
spring:
|
||||
rabbitmq:
|
||||
host: 10.100.27.4
|
||||
username: guest
|
||||
password: guest
|
||||
virtual-host: /
|
||||
listener:
|
||||
simple:
|
||||
prefetch: 1
|
||||
acknowledge-mode: manual
|
||||
retry:
|
||||
enabled: true
|
||||
publisher-confirm-type: correlated
|
||||
publisher-returns: true
|
||||
template:
|
||||
mandatory: true
|
||||
kafka:
|
||||
producer:
|
||||
# Kafka服务器
|
||||
bootstrap-servers: 43.142.96.146:9092
|
||||
# 开启事务,必须在开启了事务的方法中发送,否则报错
|
||||
transaction-id-prefix: kafkaTx-
|
||||
# 发生错误后,消息重发的次数,开启事务必须设置大于0。
|
||||
retries: 3
|
||||
# acks=0 : 生产者在成功写入消息之前不会等待任何来自服务器的响应。
|
||||
# acks=1 : 只要集群的首领节点收到消息,生产者就会收到一个来自服务器成功响应。
|
||||
# acks=all :只有当所有参与复制的节点全部收到消息时,生产者才会收到一个来自服务器的成功响应。
|
||||
# 开启事务时,必须设置为all
|
||||
acks: all
|
||||
# 当有多个消息需要被发送到同一个分区时,生产者会把它们放在同一个批次里。该参数指定了一个批次可以使用的内存大小,按照字节数计算。
|
||||
batch-size: 16384
|
||||
# 生产者内存缓冲区的大小。
|
||||
buffer-memory: 1024000
|
||||
# 键的序列化方式
|
||||
key-serializer: org.springframework.kafka.support.serializer.JsonSerializer
|
||||
# 值的序列化方式(建议使用Json,这种序列化方式可以无需额外配置传输实体类)
|
||||
value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
|
||||
properties:
|
||||
bootstrap.servers: 43.142.96.146:9092
|
||||
auto.create.topics: true
|
||||
|
||||
consumer:
|
||||
# Kafka服务器
|
||||
bootstrap-servers: 43.142.96.146:9092
|
||||
group-id: firstGroup
|
||||
# 自动提交的时间间隔 在spring boot 2.X 版本中这里采用的是值的类型为Duration 需要符合特定的格式,如1S,1M,2H,5D
|
||||
#auto-commit-interval: 2s
|
||||
# 该属性指定了消费者在读取一个没有偏移量的分区或者偏移量无效的情况下该作何处理:
|
||||
# earliest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费分区的记录
|
||||
# latest: 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据(在消费者启动之后生成的记录)
|
||||
# none: 当各分区都存在已提交的offset时,从提交的offset开始消费;只要有一个分区不存在已提交的offset,则抛出异常
|
||||
auto-offset-reset: latest
|
||||
# 是否自动提交偏移量,默认值是true,为了避免出现重复数据和数据丢失,可以把它设置为false,然后手动提交偏移量
|
||||
enable-auto-commit: false
|
||||
# 键的反序列化方式
|
||||
#key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
|
||||
key-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
|
||||
# 值的反序列化方式(建议使用Json,这种序列化方式可以无需额外配置传输实体类)
|
||||
value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
|
||||
# 配置消费者的 Json 反序列化的可信赖包,反序列化实体类需要
|
||||
properties:
|
||||
bootstrap.servers: 43.142.96.146:9092
|
||||
allow.auto.create.topics: true
|
||||
spring:
|
||||
json:
|
||||
trusted:
|
||||
packages: "*"
|
||||
# 这个参数定义了poll方法最多可以拉取多少条消息,默认值为500。如果在拉取消息的时候新消息不足500条,那有多少返回多少;如果超过500条,每次只返回500。
|
||||
# 这个默认值在有些场景下太大,有些场景很难保证能够在5min内处理完500条消息,
|
||||
# 如果消费者无法在5分钟内处理完500条消息的话就会触发reBalance,
|
||||
# 然后这批消息会被分配到另一个消费者中,还是会处理不完,这样这批消息就永远也处理不完。
|
||||
# 要避免出现上述问题,提前评估好处理一条消息最长需要多少时间,然后覆盖默认的max.poll.records参数
|
||||
# 注:需要开启BatchListener批量监听才会生效,如果不开启BatchListener则不会出现reBalance情况
|
||||
max-poll-records: 3
|
||||
properties:
|
||||
# 两次 poll之间的最大间隔,默认值为5分钟。如果超过这个间隔会触发reBalance
|
||||
max:
|
||||
poll:
|
||||
interval:
|
||||
ms: 600000
|
||||
# 当 broker多久没有收到consumer的心跳请求后就触发reBalance,默认值是10s
|
||||
session:
|
||||
timeout:
|
||||
ms: 10000
|
||||
listener:
|
||||
# 在侦听器容器中运行的线程数,一般设置为 机器数*分区数
|
||||
concurrency: 4
|
||||
# 自动提交关闭,需要设置手动消息确认
|
||||
ack-mode: manual_immediate
|
||||
# 消费监听接口监听的主题不存在时,默认会报错,所以设置为false忽略错误
|
||||
missing-topics-fatal: false
|
||||
# 两次poll之间的最大间隔,默认值为5分钟。如果超过这个间隔会触发reBalance
|
||||
poll-timeout: 600000
|
||||
main:
|
||||
allow-bean-definition-overriding: true
|
||||
dynamic:
|
||||
thread-pool:
|
||||
enable: true
|
||||
banner: true
|
||||
collect: true
|
||||
# 全局通知配置
|
||||
alarm: true # 是否报警
|
||||
check-state-interval: 3000 # 检查线程池状态,是否达到报警条件,单位毫秒
|
||||
active-alarm: 80 # 活跃度报警阈值;假设线程池最大线程数 10,当线程数达到 8 发起报警
|
||||
capacity-alarm: 80 # 容量报警阈值;假设阻塞队列容量 100,当容量达到 80 发起报警
|
||||
alarm-interval: 8 # 报警间隔,同一线程池下同一报警纬度,在 interval 时间内只会报警一次,单位秒
|
||||
receive: xxx # 企业微信填写用户 ID(填写其它将无法达到 @ 效果)、钉钉填手机号、飞书填 ou_ 开头唯一 ID
|
||||
executors:
|
||||
- thread-pool-id: 'analysis' # 线程池标识
|
||||
core-pool-size: 16 # 核心线程数
|
||||
maximum-pool-size: 20 # 最大线程数
|
||||
queue-capacity: 2147483647 # 阻塞队列大小
|
||||
execute-time-out: 10000 # 执行超时时间,超过此时间发起报警
|
||||
blocking-queue: 'LinkedBlockingQueue' # 阻塞队列名称,参考 QueueTypeEnum,支持 SPI
|
||||
rejected-handler: 'AbortPolicy' # 拒绝策略名称,参考 RejectedPolicies,支持 SPI
|
||||
keep-alive-time: 1024 # 线程存活时间,单位秒
|
||||
allow-core-thread-time-out: true # 是否允许核心线程超时
|
||||
thread-name-prefix: 'analysis' # 线程名称前缀
|
||||
notify: # 通知配置,线程池中通知配置如果存在,则会覆盖全局通知配置
|
||||
is-alarm: true # 是否报警
|
||||
active-alarm: 80 # 活跃度报警阈值;假设线程池最大线程数 10,当线程数达到 8 发起报警
|
||||
capacity-alarm: 80 # 容量报警阈值;假设阻塞队列容量 100,当容量达到 80 发起报警
|
||||
interval: 8 # 报警间隔,同一线程池下同一报警纬度,在 interval 时间内只会报警一次,单位分钟
|
||||
receive: xxx # 企业微信填写用户 ID(填写其它将无法达到 @ 效果)、钉钉填手机号、飞书填 ou_ 开头唯一 ID
|
||||
collect-type: metric
|
||||
hbase:
|
||||
conf:
|
||||
confMaps:
|
||||
hbase:
|
||||
zookeeper:
|
||||
quorum: 43.142.96.146:2181
|
||||
|
|
@ -0,0 +1,10 @@
|
|||
Spring Boot Version: ${spring-boot.version}
|
||||
Spring Application Name: ${spring.application.name}
|
||||
_ _
|
||||
(_) | |
|
||||
_ __ _ _ ___ _ _ _ ______ ___ _ _ ___ | |_ ___ _ __ ___
|
||||
| '__|| | | | / _ \ | | | || ||______|/ __|| | | |/ __|| __| / _ \| '_ ` _ \
|
||||
| | | |_| || (_) || |_| || | \__ \| |_| |\__ \| |_ | __/| | | | | |
|
||||
|_| \__,_| \___/ \__, ||_| |___/ \__, ||___/ \__| \___||_| |_| |_|
|
||||
__/ | __/ |
|
||||
|___/ |___/
|
|
@ -0,0 +1,33 @@
|
|||
# Tomcat
|
||||
server:
|
||||
port: 3338
|
||||
netty:
|
||||
port: 8081
|
||||
boss-group-thread-number: 5
|
||||
worker-group-thread-number: 20
|
||||
# Spring
|
||||
spring:
|
||||
application:
|
||||
# 应用名称
|
||||
name: ruoyi-analysis
|
||||
profiles:
|
||||
# 环境配置
|
||||
active: dev
|
||||
cloud:
|
||||
nacos:
|
||||
discovery:
|
||||
dns:
|
||||
servers: 100.100.2.136
|
||||
# 服务注册地址
|
||||
server-addr: 10.100.27.4:8848
|
||||
config:
|
||||
# 配置中心地址
|
||||
server-addr: 10.100.27.4:8848
|
||||
# 配置文件格式
|
||||
file-extension: yml
|
||||
# 共享配置
|
||||
shared-configs:
|
||||
- application-${spring.profiles.active}.${spring.cloud.nacos.config.file-extension}
|
||||
|
||||
|
||||
|
|
@ -0,0 +1,74 @@
|
|||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<configuration scan="true" scanPeriod="60 seconds" debug="false">
|
||||
<!-- 日志存放路径 -->
|
||||
<property name="log.path" value="ruoyi-analysis" />
|
||||
<!-- 日志输出格式 -->
|
||||
<property name="log.pattern" value="%d{HH:mm:ss.SSS} [%thread] %-5level %logger{20} - [%method,%line] - %msg%n" />
|
||||
|
||||
<!-- 控制台输出 -->
|
||||
<appender name="console" class="ch.qos.logback.core.ConsoleAppender">
|
||||
<encoder>
|
||||
<pattern>${log.pattern}</pattern>
|
||||
</encoder>
|
||||
</appender>
|
||||
|
||||
<!-- 系统日志输出 -->
|
||||
<appender name="file_info" class="ch.qos.logback.core.rolling.RollingFileAppender">
|
||||
<file>${log.path}/info.log</file>
|
||||
<!-- 循环政策:基于时间创建日志文件 -->
|
||||
<rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
|
||||
<!-- 日志文件名格式 -->
|
||||
<fileNamePattern>${log.path}/info.%d{yyyy-MM-dd}.log</fileNamePattern>
|
||||
<!-- 日志最大的历史 60天 -->
|
||||
<maxHistory>60</maxHistory>
|
||||
</rollingPolicy>
|
||||
<encoder>
|
||||
<pattern>${log.pattern}</pattern>
|
||||
</encoder>
|
||||
<filter class="ch.qos.logback.classic.filter.LevelFilter">
|
||||
<!-- 过滤的级别 -->
|
||||
<level>INFO</level>
|
||||
<!-- 匹配时的操作:接收(记录) -->
|
||||
<onMatch>ACCEPT</onMatch>
|
||||
<!-- 不匹配时的操作:拒绝(不记录) -->
|
||||
<onMismatch>DENY</onMismatch>
|
||||
</filter>
|
||||
</appender>
|
||||
|
||||
<appender name="file_error" class="ch.qos.logback.core.rolling.RollingFileAppender">
|
||||
<file>${log.path}/error.log</file>
|
||||
<!-- 循环政策:基于时间创建日志文件 -->
|
||||
<rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
|
||||
<!-- 日志文件名格式 -->
|
||||
<fileNamePattern>${log.path}/error.%d{yyyy-MM-dd}.log</fileNamePattern>
|
||||
<!-- 日志最大的历史 60天 -->
|
||||
<maxHistory>60</maxHistory>
|
||||
</rollingPolicy>
|
||||
<encoder>
|
||||
<pattern>${log.pattern}</pattern>
|
||||
</encoder>
|
||||
<filter class="ch.qos.logback.classic.filter.LevelFilter">
|
||||
<!-- 过滤的级别 -->
|
||||
<level>ERROR</level>
|
||||
<!-- 匹配时的操作:接收(记录) -->
|
||||
<onMatch>ACCEPT</onMatch>
|
||||
<!-- 不匹配时的操作:拒绝(不记录) -->
|
||||
<onMismatch>DENY</onMismatch>
|
||||
</filter>
|
||||
</appender>
|
||||
|
||||
<!-- 系统模块日志级别控制 -->
|
||||
<logger name="com.yun" level="info" />
|
||||
<!-- Spring日志级别控制 -->
|
||||
<logger name="org.springframework" level="warn" />
|
||||
|
||||
<root level="info">
|
||||
<appender-ref ref="console" />
|
||||
</root>
|
||||
|
||||
<!--系统操作日志-->
|
||||
<root level="info">
|
||||
<appender-ref ref="file_info" />
|
||||
<appender-ref ref="file_error" />
|
||||
</root>
|
||||
</configuration>
|
Loading…
Reference in New Issue