添加Dockerfile + 报文数据解析

master
Yang Haoyu 2023-11-26 18:33:09 +08:00
parent 56aa9a2614
commit cd38ae2edd
17 changed files with 447 additions and 19 deletions

21
Dockerfile 100644
View File

@ -0,0 +1,21 @@
FROM anolis-registry.cn-zhangjiakou.cr.aliyuncs.com/openanolis/openjdk:17-8.6
# 暴露端口号
EXPOSE 10003/tcp
# 挂载目录位置
VOLUME /home/logs/fate-common-datasource
#构造 复制外部文件到docker 内部
COPY /fate-modules-service/target/fance-menghang-service.jar /home/app.jar
# 工作目录 exec -it 进来就是默认这个目录
WORKDIR /home
# 指定东八区
RUN ln -snf /usr/share/zoneinfo/$TZ /etc/localtime && echo $TZ > .etc.timezone
# 启动java程序
CMD ["java","-Dfile.encoding=UTF-8","-jar","/home/app.jar"]

View File

@ -1,9 +1,9 @@
package com.shiyi.analysis;
import com.fate.common.core.exception.ServiceException;
import lombok.extern.log4j.Log4j2;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
@ -19,8 +19,10 @@ import static org.springframework.beans.BeanUtils.copyProperties;
* @Author : YangHaoYu
* @Date: 2023-11-22 21:49
*/
@Log4j2
public class BeanUtils {
private static final Logger log = LoggerFactory.getLogger(BeanUtils.class);
//private static final Logger log = LoggerFactory.getLogger(BeanUtils.class);
/** Bean方法名中属性名开始的下标 */
private static final int BEAN_METHOD_PROP_INDEX = 3;

View File

@ -0,0 +1,13 @@
package com.shiyi.analysis.constants;
/**
* @Description : Kafka
* @Author : YangHaoYu
* @Date: 2023-11-24 13:55
*/
public class KafkaConstant {
public static final String ANALYSIS_MESSAGE = "analysis_message";
public static final String KAFKA_SERVERS = "123.249.90.97:9092";
}

View File

@ -2,10 +2,9 @@ package com.shiyi.analysis.utils.reflex;
import com.shiyi.analysis.utils.date.DateUtils;
import com.shiyi.analysis.utils.text.Convert;
import lombok.extern.log4j.Log4j2;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.Validate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.lang.reflect.*;
import java.util.Date;
@ -15,6 +14,7 @@ import java.util.Date;
*
* @author markGuo
*/
@Log4j2
@SuppressWarnings("rawtypes")
public class ReflectUtils {
/**
@ -35,7 +35,7 @@ public class ReflectUtils {
/**
*
*/
private static Logger logger = LoggerFactory.getLogger(ReflectUtils.class);
//private static Logger logger = LoggerFactory.getLogger(ReflectUtils.class);
/**
* Getter.
@ -88,14 +88,14 @@ public class ReflectUtils {
public static <E> E getFieldValue(final Object obj, final String fieldName) {
Field field = getAccessibleField(obj, fieldName);
if (field == null) {
logger.debug("在 [" + obj.getClass() + "] 中,没有找到 [" + fieldName + "] 字段 ");
log.debug("在 [" + obj.getClass() + "] 中,没有找到 [" + fieldName + "] 字段 ");
return null;
}
E result = null;
try {
result = (E) field.get(obj);
} catch (IllegalAccessException e) {
logger.error("不可能抛出的异常{}", e.getMessage());
log.error("不可能抛出的异常{}", e.getMessage());
}
return result;
}
@ -111,13 +111,13 @@ public class ReflectUtils {
Field field = getAccessibleField(obj, fieldName);
if (field == null) {
// throw new IllegalArgumentException("在 [" + obj.getClass() + "] 中,没有找到 [" + fieldName + "] 字段 ");
logger.debug("在 [" + obj.getClass() + "] 中,没有找到 [" + fieldName + "] 字段 ");
log.debug("在 [" + obj.getClass() + "] 中,没有找到 [" + fieldName + "] 字段 ");
return;
}
try {
field.set(obj, value);
} catch (IllegalAccessException e) {
logger.error("不可能抛出的异常: {}", e.getMessage());
log.error("不可能抛出的异常: {}", e.getMessage());
}
}
@ -140,7 +140,7 @@ public class ReflectUtils {
}
Method method = getAccessibleMethod(obj, methodName, parameterTypes);
if (method == null) {
logger.debug("在 [" + obj.getClass() + "] 中,没有找到 [" + methodName + "] 方法 ");
log.debug("在 [" + obj.getClass() + "] 中,没有找到 [" + methodName + "] 方法 ");
return null;
}
try {
@ -166,7 +166,7 @@ public class ReflectUtils {
Method method = getAccessibleMethodByName(obj, methodName, args.length);
if (method == null) {
// 如果为空不报错,直接返回空。
logger.debug("在 [" + obj.getClass() + "] 中,没有找到 [" + methodName + "] 方法 ");
log.debug("在 [" + obj.getClass() + "] 中,没有找到 [" + methodName + "] 方法 ");
return null;
}
try {
@ -329,19 +329,19 @@ public class ReflectUtils {
Type genType = clazz.getGenericSuperclass();
if (!(genType instanceof ParameterizedType)) {
logger.debug(clazz.getSimpleName() + "'s superclass not ParameterizedType");
log.debug(clazz.getSimpleName() + "'s superclass not ParameterizedType");
return Object.class;
}
Type[] params = ((ParameterizedType) genType).getActualTypeArguments();
if (index >= params.length || index < 0) {
logger.debug("Index: " + index + ", Size of " + clazz.getSimpleName() + "'s Parameterized Type: "
log.debug("Index: " + index + ", Size of " + clazz.getSimpleName() + "'s Parameterized Type: "
+ params.length);
return Object.class;
}
if (!(params[index] instanceof Class)) {
logger.debug(clazz.getSimpleName() + " not set the actual class on superclass generic parameter");
log.debug(clazz.getSimpleName() + " not set the actual class on superclass generic parameter");
return Object.class;
}

View File

@ -15,6 +15,11 @@ import java.util.List;
@FeignClient(name = "fate-fence")
public interface AnalysisRemote {
/**
*
* @param drivingRecord
* @return
*/
@GetMapping("/list")
public Result<List<DrivingRecord>> list(DrivingRecord drivingRecord);
}

View File

@ -0,0 +1,31 @@
package com.shiyi.analysis.feign;
import com.fate.common.core.domain.Result;
import org.springframework.cloud.openfeign.FeignClient;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.PostMapping;
/**
* @Description :
* @Author : YangHaoYu
* @Date: 2023-11-24 09:28
*/
@FeignClient(name = "fate-fence")
public interface VehicleFeign {
/**
*
* @param vin
* @param status
* @return
*/
@PostMapping("/updateVehicleStatus/{vin}/{status}")
public Result updateVehicleStatus(@PathVariable("vin") String vin, @PathVariable("status")Integer status);
// @GetMapping("/vehicleList")
// public Result<Car> vehicleLists();
}

View File

@ -4,6 +4,7 @@ target/
!**/src/test/**/target/
### IntelliJ IDEA ###
/.idea
.idea/modules.xml
.idea/jarRepositories.xml
.idea/compiler.xml
@ -35,4 +36,4 @@ build/
.vscode/
### Mac OS ###
.DS_Store
.DS_Store

View File

@ -43,7 +43,65 @@
<artifactId>fate-modules-fence</artifactId>
<version>3.6.3</version>
</dependency>
<dependency>
<groupId>org.example</groupId>
<artifactId>fate-modules-remote</artifactId>
<version>3.6.3</version>
<scope>compile</scope>
</dependency>
<!-- RabbitMq消息队列依赖 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<!-- kafka-->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<!-- hippo4j-->
<dependency>
<groupId>cn.hippo4j</groupId>
<artifactId>hippo4j-core-spring-boot-starter</artifactId>
<version>1.3.1</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-reload4j</artifactId>
<version>1.7.36</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies>
<build>
<finalName>${project.artifactId}</finalName>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<version>2.7.2</version>
<executions>
<execution>
<goals>
<goal>repackage</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>

View File

@ -1,11 +1,30 @@
package com.shiyi.analysis.config;
import com.alibaba.fastjson2.JSON;
import com.shiyi.analysis.constants.HBaseConstant;
import com.shiyi.analysis.constants.KafkaConstant;
import com.shiyi.analysis.constants.RabbitConstants;
import com.shiyi.analysis.constants.RedisConstants;
import com.shiyi.analysis.domain.VehicleMessage;
import com.shiyi.analysis.feign.VehicleFeign;
import com.shiyi.analysis.handler.MessageHandler;
import com.shiyi.analysis.service.HbaseService;
import lombok.extern.log4j.Log4j2;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Optional;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
/**
* @Description : KafKa
* @Author : YangHaoYu
@ -21,6 +40,66 @@ public class KafkaConsumer {
@Autowired
private RedisTemplate redisTemplate;
// @Autowired
// private VehicleFeign vehicleFeign;
@Autowired
private VehicleFeign vehicleFeign;
@Autowired
private RabbitTemplate rabbitTemplate;
@Autowired
private ThreadPoolExecutor alanAnalysisThreadPool;
private ExecutorService executorService = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() * 2);
private SimpleDateFormat tableNamemat = new SimpleDateFormat("yyyy-MM-dd");
/**
*kafka
* @param consumerRecord
*/
@KafkaListener(topics = KafkaConstant.ANALYSIS_MESSAGE)
public void OnMmessage(ConsumerRecord<?,?> consumerRecord){
//提交到线程池里
alanAnalysisThreadPool.submit(() ->{
});
}
/**
*
* @param consumerRecord ConsumerRecord
*/
public void messageHandler(ConsumerRecord<?,?> consumerRecord){
Optional<?> value = Optional.ofNullable(consumerRecord.value());
log.info("partition:"+consumerRecord.partition());
log.info("key:"+consumerRecord.key());
if (value.isPresent()){
// 获取到消息内容
Object object = value.get();
log.info("topic{},分区:{}KafKa接收到的消息{}","vehicle_message",consumerRecord.partition(),object);
String objectMessage = (String) object;
// 将车辆消息转换为实体类
VehicleMessage vehicleMessage = MessageHandler.messageTranslated(objectMessage);
// 将车辆实时数据存入redis
redisTemplate.opsForValue().set(RedisConstants.CURRENT_INFO+":"+vehicleMessage.getVin(),vehicleMessage);
// 车辆故障处理
rabbitTemplate.convertSendAndReceive(RabbitConstants.FAULT_HANDLER_QUEUE,objectMessage);
// 车辆围栏处理
rabbitTemplate.convertSendAndReceive(RabbitConstants.FENCE_HANDLER_QUEUE, JSON.toJSONString(vehicleMessage));
try {
String tableName = HBaseConstant.HBASE_TABLE_PREFIX + tableNamemat.format(new Date(vehicleMessage.getTime()));
log.info("信息入库;database:{}vin{}time{}",tableName,vehicleMessage.getVin(),System.currentTimeMillis());
// 将信息添加到hbase
hbaseService.insertRecordsForObj(tableName,vehicleMessage.getVin()+ ":" +vehicleMessage.getTime(),"info",vehicleMessage);
}catch (Exception e){
log.info("添加hbase出错{}",e.getMessage());
}
}
}
}

View File

@ -0,0 +1,39 @@
package com.shiyi.analysis.config;
import com.shiyi.analysis.constants.RabbitConstants;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;
/**
* @Description :Rabbit
* @Author : YangHaoYu
* @Date: 2023-11-26 10:04
*/
@Component
public class RabbitConfig {
/**
*
* @return
*/
@Bean
public Queue initFaultHandlerQueue(){
return new Queue(RabbitConstants.FAULT_HANDLER_QUEUE,true);
}
/**
*
* @return
*/
@Bean
public Queue initFenceHandlerQueue(){
return new Queue(RabbitConstants.FENCE_HANDLER_QUEUE,true);
}
}

View File

@ -0,0 +1,34 @@
package com.shiyi.analysis.config;
import com.shiyi.analysis.constants.HBaseConstant;
import com.shiyi.analysis.service.impl.HbaseServiceimpl;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.stereotype.Component;
import java.text.SimpleDateFormat;
import java.util.Date;
/**
* @Description :
* @Author : YangHaoYu
* @Date: 2023-11-26 10:11
*/
@Component
public class RunnerAfter implements ApplicationRunner {
@Autowired
private HbaseServiceimpl hbaseServiceimpl;
@Override
public void run(ApplicationArguments args) throws Exception {
SimpleDateFormat sdf = new SimpleDateFormat("yyyy_MM-dd");
String format = sdf.format(new Date());
//项目启动时根据当前时间判断是否建表
if (hbaseServiceimpl.isisTableExists(HBaseConstant.HBASE_TABLE_PREFIX+format)){
hbaseServiceimpl.createTable(HBaseConstant.HBASE_TABLE_PREFIX+format,new String[]{"info"});
}
}
}

View File

@ -0,0 +1,33 @@
package com.shiyi.analysis.config;
import cn.hippo4j.core.executor.DynamicThreadPool;
import cn.hippo4j.core.executor.support.ThreadPoolBuilder;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.concurrent.ThreadPoolExecutor;
/**
* @Description :
* @Author : YangHaoYu
* @Date: 2023-11-26 10:34
*/
@Configuration
public class ThreadPoolConfig {
@Bean
@DynamicThreadPool
public ThreadPoolExecutor aAnalysisThreadPool() {
// 创建一个分析线程池
String threadPoolId = "fate-analysis";
// 使用线程工厂和线程池ID创建线程池构建器
ThreadPoolExecutor threadPoolBuilder = ThreadPoolBuilder.builder()
.threadFactory(threadPoolId)
.threadPoolId(threadPoolId)
.dynamicPool()
.build();
// 返回线程池构建器
return threadPoolBuilder;
}
}

View File

@ -0,0 +1,40 @@
package com.shiyi.analysis.config;
import com.shiyi.analysis.constants.HBaseConstant;
import com.shiyi.analysis.service.impl.HbaseServiceimpl;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.Date;
/**
* @Description :
* @Author : YangHaoYu
* @Date: 2023-11-26 10:42
*/
@Component
public class TimeConfig {
@Autowired
private HbaseServiceimpl hbaseServiceimpl;
private SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd");
/**
*
* @throws IOException
*/
@Scheduled(cron = "0 0 0 * * ?")
public void flushNowDay() throws IOException {
String format = sdf.format(new Date());
//判断表是否存在
if(hbaseServiceimpl.isisTableExists(HBaseConstant.HBASE_TABLE_PREFIX+format)){
//不存在 创建表
hbaseServiceimpl.createTable(HBaseConstant.HBASE_TABLE_PREFIX+format,new String[]{"info"});
}
}
}

View File

@ -8,4 +8,8 @@ package com.shiyi.analysis.hbase.exception;
public class HBaseException extends RuntimeException{
public HBaseException(String message){super(message);}
public HBaseException(String message, Throwable cause) {
super(message, cause);
}
}

View File

@ -4,6 +4,7 @@ import com.shiyi.analysis.hbase.config.HbaseConfig;
import lombok.extern.log4j.Log4j2;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.springframework.context.annotation.Bean;
@ -39,4 +40,15 @@ public class HBaseInstance {
return null;
}
}
@Bean(name = "hbaseAdmin")
public Admin initAdmin(Connection connection){
try {
return connection.getAdmin();
}catch (Exception e){
log.error("创建HBase管理API出错",e);
return null;
}
}
}

View File

@ -2,6 +2,7 @@ package com.shiyi.analysis.service;
import com.shiyi.analysis.domain.VehicleMessage;
import java.io.IOException;
import java.util.Collection;
/**
@ -13,4 +14,11 @@ public interface HbaseService {
Collection<? extends VehicleMessage> scanRange(String tablename, String start, String end);
Collection<? extends VehicleMessage> scanRangeByStart(String tablename, String start);
void insertRecordsForObj(String tableName, String row, String columnFamilys, VehicleMessage vehicleMessage) throws IOException;
boolean isisTableExists(String tableName) throws IOException;
void createTable(String tableName, String[] columnFamily) throws IOException;
}

View File

@ -6,16 +6,22 @@ import com.shiyi.analysis.service.HbaseService;
import lombok.SneakyThrows;
import lombok.extern.log4j.Log4j2;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.util.Bytes;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.io.IOException;
import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.stream.Collectors;
/**
* @Description :
@ -78,7 +84,7 @@ public class HbaseServiceimpl implements HbaseService {
List<Cell> cells = result.listCells();
for (Cell cell : cells){
String rowkey = Bytes.toString(cell.getRowArray(),cell.getRowOffset(),cell.getRowLength());
//获取列族
//获取
String familyName = Bytes.toString(cell.getFamilyArray(),cell.getFamilyOffset(),cell.getFamilyLength());
//获取列名
String columnName = Bytes.toString(cell.getQualifierArray(),cell.getQualifierOffset(),cell.getQualifierLength());
@ -91,4 +97,46 @@ public class HbaseServiceimpl implements HbaseService {
}
return messages;
}
@Override
public void insertRecordsForObj(String tableName, String row, String columnFamilys, VehicleMessage vehicleMessage) throws IOException {
TableName name = TableName.valueOf(String.valueOf(tableName));
Table table = connection.getTable(name);
Put put = new Put(Bytes.toBytes(row));
Class<? extends VehicleMessage> aClass = vehicleMessage.getClass();
Field[] declaredFields = aClass.getDeclaredFields();
List<String> collect = Arrays.stream(declaredFields).map(Field::getName).collect(Collectors.toList());
List<String> values =Arrays.stream(declaredFields).map(temp ->{
return String.valueOf(BeanUtils.getGetterMethodValue(aClass, temp.getName(),vehicleMessage));
}).collect(Collectors.toList());
for (int i = 0; i < collect.size(); i++) {
put.addColumn(Bytes.toBytes(columnFamilys),Bytes.toBytes(collect.get(i)),Bytes.toBytes(values.get(i)));
table.put(put);
}
}
@Override
public boolean isisTableExists(String tableName) throws IOException {
Admin admin = connection.getAdmin();
return admin.tableExists(TableName.valueOf(tableName));
}
@Override
public void createTable(String tableName, String[] columnFamily) throws IOException {
TableName name = TableName.valueOf(tableName);
//如果存在则删除
if (admin.tableExists(name)) {
admin.disableTable(name);
admin.deleteTable(name);
log.info("该数据表已被创建,表名:{}",name);
}else {
HTableDescriptor desc = new HTableDescriptor(name);
for (String cf : columnFamily) {
desc.addFamily(new HColumnDescriptor(cf));
}
admin.createTable(desc);
}
}
}