From cd38ae2edde05656dbe0c049194487d89e1931c3 Mon Sep 17 00:00:00 2001 From: Yang Haoyu <2241399212@qq.com> Date: Sun, 26 Nov 2023 18:33:09 +0800 Subject: [PATCH] =?UTF-8?q?=E6=B7=BB=E5=8A=A0Dockerfile=20+=20=E6=8A=A5?= =?UTF-8?q?=E6=96=87=E6=95=B0=E6=8D=AE=E8=A7=A3=E6=9E=90?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- Dockerfile | 21 +++++ .../java/com/shiyi/analysis/BeanUtils.java | 8 +- .../analysis/constants/KafkaConstant.java | 13 +++ .../analysis/utils/reflex/ReflectUtils.java | 24 +++--- .../shiyi/analysis/feign/AnalysisRemote.java | 5 ++ .../shiyi/analysis/feign/VehicleFeign.java | 31 +++++++ fate-modules-service/.gitignore | 3 +- fate-modules-service/pom.xml | 58 +++++++++++++ .../shiyi/analysis/config/KafkaConsumer.java | 83 ++++++++++++++++++- .../shiyi/analysis/config/RabbitConfig.java | 39 +++++++++ .../shiyi/analysis/config/RunnerAfter.java | 34 ++++++++ .../analysis/config/ThreadPoolConfig.java | 33 ++++++++ .../com/shiyi/analysis/config/TimeConfig.java | 40 +++++++++ .../hbase/exception/HBaseException.java | 4 + .../hbase/instance/HBaseInstance.java | 12 +++ .../shiyi/analysis/service/HbaseService.java | 8 ++ .../service/impl/HbaseServiceimpl.java | 50 ++++++++++- 17 files changed, 447 insertions(+), 19 deletions(-) create mode 100644 Dockerfile create mode 100644 fate-modules-common/src/main/java/com/shiyi/analysis/constants/KafkaConstant.java create mode 100644 fate-modules-remote/src/main/java/com/shiyi/analysis/feign/VehicleFeign.java create mode 100644 fate-modules-service/src/main/java/com/shiyi/analysis/config/RabbitConfig.java create mode 100644 fate-modules-service/src/main/java/com/shiyi/analysis/config/RunnerAfter.java create mode 100644 fate-modules-service/src/main/java/com/shiyi/analysis/config/ThreadPoolConfig.java create mode 100644 fate-modules-service/src/main/java/com/shiyi/analysis/config/TimeConfig.java diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..ca264ea --- /dev/null +++ b/Dockerfile @@ -0,0 +1,21 @@ +FROM anolis-registry.cn-zhangjiakou.cr.aliyuncs.com/openanolis/openjdk:17-8.6 + + +# 暴露端口号 +EXPOSE 10003/tcp + + +# 挂载目录位置 +VOLUME /home/logs/fate-common-datasource + +#构造 复制外部文件到docker 内部 +COPY /fate-modules-service/target/fance-menghang-service.jar /home/app.jar + +# 工作目录 exec -it 进来就是默认这个目录 +WORKDIR /home + +# 指定东八区 +RUN ln -snf /usr/share/zoneinfo/$TZ /etc/localtime && echo $TZ > .etc.timezone + +# 启动java程序 +CMD ["java","-Dfile.encoding=UTF-8","-jar","/home/app.jar"] diff --git a/fate-modules-common/src/main/java/com/shiyi/analysis/BeanUtils.java b/fate-modules-common/src/main/java/com/shiyi/analysis/BeanUtils.java index f99012e..534dc7c 100644 --- a/fate-modules-common/src/main/java/com/shiyi/analysis/BeanUtils.java +++ b/fate-modules-common/src/main/java/com/shiyi/analysis/BeanUtils.java @@ -1,9 +1,9 @@ package com.shiyi.analysis; import com.fate.common.core.exception.ServiceException; +import lombok.extern.log4j.Log4j2; import org.apache.commons.lang3.StringUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; + import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; @@ -19,8 +19,10 @@ import static org.springframework.beans.BeanUtils.copyProperties; * @Author : YangHaoYu * @Date: 2023-11-22 21:49 */ + +@Log4j2 public class BeanUtils { - private static final Logger log = LoggerFactory.getLogger(BeanUtils.class); + //private static final Logger log = LoggerFactory.getLogger(BeanUtils.class); /** Bean方法名中属性名开始的下标 */ private static final int BEAN_METHOD_PROP_INDEX = 3; diff --git a/fate-modules-common/src/main/java/com/shiyi/analysis/constants/KafkaConstant.java b/fate-modules-common/src/main/java/com/shiyi/analysis/constants/KafkaConstant.java new file mode 100644 index 0000000..a672793 --- /dev/null +++ b/fate-modules-common/src/main/java/com/shiyi/analysis/constants/KafkaConstant.java @@ -0,0 +1,13 @@ +package com.shiyi.analysis.constants; + +/** + * @Description : Kafka常量类 + * @Author : YangHaoYu + * @Date: 2023-11-24 13:55 + */ +public class KafkaConstant { + + public static final String ANALYSIS_MESSAGE = "analysis_message"; + + public static final String KAFKA_SERVERS = "123.249.90.97:9092"; +} diff --git a/fate-modules-common/src/main/java/com/shiyi/analysis/utils/reflex/ReflectUtils.java b/fate-modules-common/src/main/java/com/shiyi/analysis/utils/reflex/ReflectUtils.java index 4a5e129..fde2243 100644 --- a/fate-modules-common/src/main/java/com/shiyi/analysis/utils/reflex/ReflectUtils.java +++ b/fate-modules-common/src/main/java/com/shiyi/analysis/utils/reflex/ReflectUtils.java @@ -2,10 +2,9 @@ package com.shiyi.analysis.utils.reflex; import com.shiyi.analysis.utils.date.DateUtils; import com.shiyi.analysis.utils.text.Convert; +import lombok.extern.log4j.Log4j2; import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.Validate; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import java.lang.reflect.*; import java.util.Date; @@ -15,6 +14,7 @@ import java.util.Date; * * @author markGuo */ +@Log4j2 @SuppressWarnings("rawtypes") public class ReflectUtils { /** @@ -35,7 +35,7 @@ public class ReflectUtils { /** * 日志 */ - private static Logger logger = LoggerFactory.getLogger(ReflectUtils.class); + //private static Logger logger = LoggerFactory.getLogger(ReflectUtils.class); /** * 调用Getter方法. @@ -88,14 +88,14 @@ public class ReflectUtils { public static E getFieldValue(final Object obj, final String fieldName) { Field field = getAccessibleField(obj, fieldName); if (field == null) { - logger.debug("在 [" + obj.getClass() + "] 中,没有找到 [" + fieldName + "] 字段 "); + log.debug("在 [" + obj.getClass() + "] 中,没有找到 [" + fieldName + "] 字段 "); return null; } E result = null; try { result = (E) field.get(obj); } catch (IllegalAccessException e) { - logger.error("不可能抛出的异常{}", e.getMessage()); + log.error("不可能抛出的异常{}", e.getMessage()); } return result; } @@ -111,13 +111,13 @@ public class ReflectUtils { Field field = getAccessibleField(obj, fieldName); if (field == null) { // throw new IllegalArgumentException("在 [" + obj.getClass() + "] 中,没有找到 [" + fieldName + "] 字段 "); - logger.debug("在 [" + obj.getClass() + "] 中,没有找到 [" + fieldName + "] 字段 "); + log.debug("在 [" + obj.getClass() + "] 中,没有找到 [" + fieldName + "] 字段 "); return; } try { field.set(obj, value); } catch (IllegalAccessException e) { - logger.error("不可能抛出的异常: {}", e.getMessage()); + log.error("不可能抛出的异常: {}", e.getMessage()); } } @@ -140,7 +140,7 @@ public class ReflectUtils { } Method method = getAccessibleMethod(obj, methodName, parameterTypes); if (method == null) { - logger.debug("在 [" + obj.getClass() + "] 中,没有找到 [" + methodName + "] 方法 "); + log.debug("在 [" + obj.getClass() + "] 中,没有找到 [" + methodName + "] 方法 "); return null; } try { @@ -166,7 +166,7 @@ public class ReflectUtils { Method method = getAccessibleMethodByName(obj, methodName, args.length); if (method == null) { // 如果为空不报错,直接返回空。 - logger.debug("在 [" + obj.getClass() + "] 中,没有找到 [" + methodName + "] 方法 "); + log.debug("在 [" + obj.getClass() + "] 中,没有找到 [" + methodName + "] 方法 "); return null; } try { @@ -329,19 +329,19 @@ public class ReflectUtils { Type genType = clazz.getGenericSuperclass(); if (!(genType instanceof ParameterizedType)) { - logger.debug(clazz.getSimpleName() + "'s superclass not ParameterizedType"); + log.debug(clazz.getSimpleName() + "'s superclass not ParameterizedType"); return Object.class; } Type[] params = ((ParameterizedType) genType).getActualTypeArguments(); if (index >= params.length || index < 0) { - logger.debug("Index: " + index + ", Size of " + clazz.getSimpleName() + "'s Parameterized Type: " + log.debug("Index: " + index + ", Size of " + clazz.getSimpleName() + "'s Parameterized Type: " + params.length); return Object.class; } if (!(params[index] instanceof Class)) { - logger.debug(clazz.getSimpleName() + " not set the actual class on superclass generic parameter"); + log.debug(clazz.getSimpleName() + " not set the actual class on superclass generic parameter"); return Object.class; } diff --git a/fate-modules-remote/src/main/java/com/shiyi/analysis/feign/AnalysisRemote.java b/fate-modules-remote/src/main/java/com/shiyi/analysis/feign/AnalysisRemote.java index 717f249..43a6321 100644 --- a/fate-modules-remote/src/main/java/com/shiyi/analysis/feign/AnalysisRemote.java +++ b/fate-modules-remote/src/main/java/com/shiyi/analysis/feign/AnalysisRemote.java @@ -15,6 +15,11 @@ import java.util.List; @FeignClient(name = "fate-fence") public interface AnalysisRemote { + /** + * 历史轨迹查询 + * @param drivingRecord + * @return + */ @GetMapping("/list") public Result> list(DrivingRecord drivingRecord); } diff --git a/fate-modules-remote/src/main/java/com/shiyi/analysis/feign/VehicleFeign.java b/fate-modules-remote/src/main/java/com/shiyi/analysis/feign/VehicleFeign.java new file mode 100644 index 0000000..da1516e --- /dev/null +++ b/fate-modules-remote/src/main/java/com/shiyi/analysis/feign/VehicleFeign.java @@ -0,0 +1,31 @@ +package com.shiyi.analysis.feign; + +import com.fate.common.core.domain.Result; +import org.springframework.cloud.openfeign.FeignClient; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.PathVariable; +import org.springframework.web.bind.annotation.PostMapping; + +/** + * @Description : 远程调用车辆接口 + * @Author : YangHaoYu + * @Date: 2023-11-24 09:28 + */ +@FeignClient(name = "fate-fence") +public interface VehicleFeign { + + /** + * 修改车辆状态 + * @param vin + * @param status + * @return + */ + @PostMapping("/updateVehicleStatus/{vin}/{status}") + public Result updateVehicleStatus(@PathVariable("vin") String vin, @PathVariable("status")Integer status); + + +// @GetMapping("/vehicleList") +// public Result vehicleLists(); + + +} diff --git a/fate-modules-service/.gitignore b/fate-modules-service/.gitignore index 5ff6309..5793576 100644 --- a/fate-modules-service/.gitignore +++ b/fate-modules-service/.gitignore @@ -4,6 +4,7 @@ target/ !**/src/test/**/target/ ### IntelliJ IDEA ### +/.idea .idea/modules.xml .idea/jarRepositories.xml .idea/compiler.xml @@ -35,4 +36,4 @@ build/ .vscode/ ### Mac OS ### -.DS_Store \ No newline at end of file +.DS_Store diff --git a/fate-modules-service/pom.xml b/fate-modules-service/pom.xml index c94a379..7a61f2b 100644 --- a/fate-modules-service/pom.xml +++ b/fate-modules-service/pom.xml @@ -43,7 +43,65 @@ fate-modules-fence 3.6.3 + + org.example + fate-modules-remote + 3.6.3 + compile + + + + + + org.springframework.boot + spring-boot-starter-amqp + + + + + org.springframework.kafka + spring-kafka + + + + + cn.hippo4j + hippo4j-core-spring-boot-starter + 1.3.1 + + + + org.slf4j + slf4j-reload4j + 1.7.36 + + + org.slf4j + slf4j-api + + + + + + + ${project.artifactId} + + + org.springframework.boot + spring-boot-maven-plugin + 2.7.2 + + + + repackage + + + + + + + diff --git a/fate-modules-service/src/main/java/com/shiyi/analysis/config/KafkaConsumer.java b/fate-modules-service/src/main/java/com/shiyi/analysis/config/KafkaConsumer.java index 6d200af..788e7f4 100644 --- a/fate-modules-service/src/main/java/com/shiyi/analysis/config/KafkaConsumer.java +++ b/fate-modules-service/src/main/java/com/shiyi/analysis/config/KafkaConsumer.java @@ -1,11 +1,30 @@ package com.shiyi.analysis.config; +import com.alibaba.fastjson2.JSON; +import com.shiyi.analysis.constants.HBaseConstant; +import com.shiyi.analysis.constants.KafkaConstant; +import com.shiyi.analysis.constants.RabbitConstants; +import com.shiyi.analysis.constants.RedisConstants; +import com.shiyi.analysis.domain.VehicleMessage; +import com.shiyi.analysis.feign.VehicleFeign; +import com.shiyi.analysis.handler.MessageHandler; import com.shiyi.analysis.service.HbaseService; import lombok.extern.log4j.Log4j2; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.redis.core.RedisTemplate; +import org.springframework.kafka.annotation.KafkaListener; import org.springframework.stereotype.Component; +import java.io.IOException; +import java.text.SimpleDateFormat; +import java.util.Date; +import java.util.Optional; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ThreadPoolExecutor; + /** * @Description : KafKa消费者端 * @Author : YangHaoYu @@ -21,6 +40,66 @@ public class KafkaConsumer { @Autowired private RedisTemplate redisTemplate; -// @Autowired -// private VehicleFeign vehicleFeign; + @Autowired + private VehicleFeign vehicleFeign; + + @Autowired + private RabbitTemplate rabbitTemplate; + + @Autowired + private ThreadPoolExecutor alanAnalysisThreadPool; + + private ExecutorService executorService = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() * 2); + + private SimpleDateFormat tableNamemat = new SimpleDateFormat("yyyy-MM-dd"); + + /** + *kafka消息接收端 + * @param consumerRecord + */ + @KafkaListener(topics = KafkaConstant.ANALYSIS_MESSAGE) + public void OnMmessage(ConsumerRecord consumerRecord){ + //提交到线程池里 + alanAnalysisThreadPool.submit(() ->{ + + }); + } + + + /** + * 消息处理方法 + * @param consumerRecord ConsumerRecord对象 + */ + public void messageHandler(ConsumerRecord consumerRecord){ + Optional value = Optional.ofNullable(consumerRecord.value()); + log.info("partition:"+consumerRecord.partition()); + log.info("key:"+consumerRecord.key()); + if (value.isPresent()){ + // 获取到消息内容 + Object object = value.get(); + log.info("topic:{},分区:{},KafKa接收到的消息:{}","vehicle_message",consumerRecord.partition(),object); + String objectMessage = (String) object; + // 将车辆消息转换为实体类 + VehicleMessage vehicleMessage = MessageHandler.messageTranslated(objectMessage); + // 将车辆实时数据存入redis + redisTemplate.opsForValue().set(RedisConstants.CURRENT_INFO+":"+vehicleMessage.getVin(),vehicleMessage); + // 车辆故障处理 + rabbitTemplate.convertSendAndReceive(RabbitConstants.FAULT_HANDLER_QUEUE,objectMessage); + // 车辆围栏处理 + rabbitTemplate.convertSendAndReceive(RabbitConstants.FENCE_HANDLER_QUEUE, JSON.toJSONString(vehicleMessage)); + try { + String tableName = HBaseConstant.HBASE_TABLE_PREFIX + tableNamemat.format(new Date(vehicleMessage.getTime())); + log.info("信息入库;database:{},vin:{},time:{}",tableName,vehicleMessage.getVin(),System.currentTimeMillis()); + // 将信息添加到hbase + hbaseService.insertRecordsForObj(tableName,vehicleMessage.getVin()+ ":" +vehicleMessage.getTime(),"info",vehicleMessage); + }catch (Exception e){ + log.info("添加hbase出错:{}",e.getMessage()); + } + } + } + + + + + } diff --git a/fate-modules-service/src/main/java/com/shiyi/analysis/config/RabbitConfig.java b/fate-modules-service/src/main/java/com/shiyi/analysis/config/RabbitConfig.java new file mode 100644 index 0000000..18e7569 --- /dev/null +++ b/fate-modules-service/src/main/java/com/shiyi/analysis/config/RabbitConfig.java @@ -0,0 +1,39 @@ +package com.shiyi.analysis.config; + +import com.shiyi.analysis.constants.RabbitConstants; +import org.springframework.amqp.core.Queue; +import org.springframework.context.annotation.Bean; +import org.springframework.stereotype.Component; + + + +/** + * @Description :Rabbit故障处理 + * @Author : YangHaoYu + * @Date: 2023-11-26 10:04 + */ +@Component +public class RabbitConfig { + + + /** + * 创建故障处理队列 + * @return + */ + @Bean + public Queue initFaultHandlerQueue(){ + return new Queue(RabbitConstants.FAULT_HANDLER_QUEUE,true); + } + + /** + * 创建围栏处理队列 + * @return + */ + @Bean + public Queue initFenceHandlerQueue(){ + return new Queue(RabbitConstants.FENCE_HANDLER_QUEUE,true); + } + + + +} diff --git a/fate-modules-service/src/main/java/com/shiyi/analysis/config/RunnerAfter.java b/fate-modules-service/src/main/java/com/shiyi/analysis/config/RunnerAfter.java new file mode 100644 index 0000000..908b1b9 --- /dev/null +++ b/fate-modules-service/src/main/java/com/shiyi/analysis/config/RunnerAfter.java @@ -0,0 +1,34 @@ +package com.shiyi.analysis.config; + +import com.shiyi.analysis.constants.HBaseConstant; +import com.shiyi.analysis.service.impl.HbaseServiceimpl; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.ApplicationArguments; +import org.springframework.boot.ApplicationRunner; +import org.springframework.stereotype.Component; + +import java.text.SimpleDateFormat; +import java.util.Date; + +/** + * @Description : + * @Author : YangHaoYu + * @Date: 2023-11-26 10:11 + */ +@Component +public class RunnerAfter implements ApplicationRunner { + + @Autowired + private HbaseServiceimpl hbaseServiceimpl; + + + @Override + public void run(ApplicationArguments args) throws Exception { + SimpleDateFormat sdf = new SimpleDateFormat("yyyy_MM-dd"); + String format = sdf.format(new Date()); + //项目启动时根据当前时间判断是否建表 + if (hbaseServiceimpl.isisTableExists(HBaseConstant.HBASE_TABLE_PREFIX+format)){ + hbaseServiceimpl.createTable(HBaseConstant.HBASE_TABLE_PREFIX+format,new String[]{"info"}); + } + } +} diff --git a/fate-modules-service/src/main/java/com/shiyi/analysis/config/ThreadPoolConfig.java b/fate-modules-service/src/main/java/com/shiyi/analysis/config/ThreadPoolConfig.java new file mode 100644 index 0000000..04ef7da --- /dev/null +++ b/fate-modules-service/src/main/java/com/shiyi/analysis/config/ThreadPoolConfig.java @@ -0,0 +1,33 @@ +package com.shiyi.analysis.config; + +import cn.hippo4j.core.executor.DynamicThreadPool; +import cn.hippo4j.core.executor.support.ThreadPoolBuilder; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +import java.util.concurrent.ThreadPoolExecutor; + +/** + * @Description : + * @Author : YangHaoYu + * @Date: 2023-11-26 10:34 + */ +@Configuration +public class ThreadPoolConfig { + @Bean + @DynamicThreadPool + public ThreadPoolExecutor aAnalysisThreadPool() { + // 创建一个分析线程池 + String threadPoolId = "fate-analysis"; + // 使用线程工厂和线程池ID创建线程池构建器 + ThreadPoolExecutor threadPoolBuilder = ThreadPoolBuilder.builder() + .threadFactory(threadPoolId) + .threadPoolId(threadPoolId) + .dynamicPool() + .build(); + // 返回线程池构建器 + return threadPoolBuilder; + } + + +} diff --git a/fate-modules-service/src/main/java/com/shiyi/analysis/config/TimeConfig.java b/fate-modules-service/src/main/java/com/shiyi/analysis/config/TimeConfig.java new file mode 100644 index 0000000..f18ef05 --- /dev/null +++ b/fate-modules-service/src/main/java/com/shiyi/analysis/config/TimeConfig.java @@ -0,0 +1,40 @@ +package com.shiyi.analysis.config; + +import com.shiyi.analysis.constants.HBaseConstant; +import com.shiyi.analysis.service.impl.HbaseServiceimpl; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.scheduling.annotation.Scheduled; +import org.springframework.stereotype.Component; + +import java.io.IOException; +import java.text.SimpleDateFormat; +import java.util.Date; + +/** + * @Description : + * @Author : YangHaoYu + * @Date: 2023-11-26 10:42 + */ +@Component +public class TimeConfig { + + @Autowired + private HbaseServiceimpl hbaseServiceimpl; + + private SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd"); + + + /** + *定时任务每日创建表重新赋值新表地址 + * @throws IOException + */ + @Scheduled(cron = "0 0 0 * * ?") + public void flushNowDay() throws IOException { + String format = sdf.format(new Date()); + //判断表是否存在 + if(hbaseServiceimpl.isisTableExists(HBaseConstant.HBASE_TABLE_PREFIX+format)){ + //不存在 创建表 + hbaseServiceimpl.createTable(HBaseConstant.HBASE_TABLE_PREFIX+format,new String[]{"info"}); + } + } +} diff --git a/fate-modules-service/src/main/java/com/shiyi/analysis/hbase/exception/HBaseException.java b/fate-modules-service/src/main/java/com/shiyi/analysis/hbase/exception/HBaseException.java index 753c80d..0d3dad4 100644 --- a/fate-modules-service/src/main/java/com/shiyi/analysis/hbase/exception/HBaseException.java +++ b/fate-modules-service/src/main/java/com/shiyi/analysis/hbase/exception/HBaseException.java @@ -8,4 +8,8 @@ package com.shiyi.analysis.hbase.exception; public class HBaseException extends RuntimeException{ public HBaseException(String message){super(message);} + + public HBaseException(String message, Throwable cause) { + super(message, cause); + } } diff --git a/fate-modules-service/src/main/java/com/shiyi/analysis/hbase/instance/HBaseInstance.java b/fate-modules-service/src/main/java/com/shiyi/analysis/hbase/instance/HBaseInstance.java index 14a77e7..8387ea9 100644 --- a/fate-modules-service/src/main/java/com/shiyi/analysis/hbase/instance/HBaseInstance.java +++ b/fate-modules-service/src/main/java/com/shiyi/analysis/hbase/instance/HBaseInstance.java @@ -4,6 +4,7 @@ import com.shiyi.analysis.hbase.config.HbaseConfig; import lombok.extern.log4j.Log4j2; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; import org.springframework.context.annotation.Bean; @@ -39,4 +40,15 @@ public class HBaseInstance { return null; } } + + + @Bean(name = "hbaseAdmin") + public Admin initAdmin(Connection connection){ + try { + return connection.getAdmin(); + }catch (Exception e){ + log.error("创建HBase管理API出错",e); + return null; + } + } } diff --git a/fate-modules-service/src/main/java/com/shiyi/analysis/service/HbaseService.java b/fate-modules-service/src/main/java/com/shiyi/analysis/service/HbaseService.java index 481d758..b9a95e0 100644 --- a/fate-modules-service/src/main/java/com/shiyi/analysis/service/HbaseService.java +++ b/fate-modules-service/src/main/java/com/shiyi/analysis/service/HbaseService.java @@ -2,6 +2,7 @@ package com.shiyi.analysis.service; import com.shiyi.analysis.domain.VehicleMessage; +import java.io.IOException; import java.util.Collection; /** @@ -13,4 +14,11 @@ public interface HbaseService { Collection scanRange(String tablename, String start, String end); Collection scanRangeByStart(String tablename, String start); + + void insertRecordsForObj(String tableName, String row, String columnFamilys, VehicleMessage vehicleMessage) throws IOException; + + + boolean isisTableExists(String tableName) throws IOException; + + void createTable(String tableName, String[] columnFamily) throws IOException; } diff --git a/fate-modules-service/src/main/java/com/shiyi/analysis/service/impl/HbaseServiceimpl.java b/fate-modules-service/src/main/java/com/shiyi/analysis/service/impl/HbaseServiceimpl.java index 251284e..e942fd4 100644 --- a/fate-modules-service/src/main/java/com/shiyi/analysis/service/impl/HbaseServiceimpl.java +++ b/fate-modules-service/src/main/java/com/shiyi/analysis/service/impl/HbaseServiceimpl.java @@ -6,16 +6,22 @@ import com.shiyi.analysis.service.HbaseService; import lombok.SneakyThrows; import lombok.extern.log4j.Log4j2; import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.*; import org.apache.hadoop.hbase.util.Bytes; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; +import java.io.IOException; +import java.lang.reflect.Field; import java.lang.reflect.Method; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; import java.util.List; +import java.util.stream.Collectors; /** * @Description : 数据业务层 @@ -78,7 +84,7 @@ public class HbaseServiceimpl implements HbaseService { List cells = result.listCells(); for (Cell cell : cells){ String rowkey = Bytes.toString(cell.getRowArray(),cell.getRowOffset(),cell.getRowLength()); - //获取列族名 + //获取组名 String familyName = Bytes.toString(cell.getFamilyArray(),cell.getFamilyOffset(),cell.getFamilyLength()); //获取列名 String columnName = Bytes.toString(cell.getQualifierArray(),cell.getQualifierOffset(),cell.getQualifierLength()); @@ -91,4 +97,46 @@ public class HbaseServiceimpl implements HbaseService { } return messages; } + + @Override + public void insertRecordsForObj(String tableName, String row, String columnFamilys, VehicleMessage vehicleMessage) throws IOException { + TableName name = TableName.valueOf(String.valueOf(tableName)); + Table table = connection.getTable(name); + Put put = new Put(Bytes.toBytes(row)); + Class aClass = vehicleMessage.getClass(); + Field[] declaredFields = aClass.getDeclaredFields(); + List collect = Arrays.stream(declaredFields).map(Field::getName).collect(Collectors.toList()); + List values =Arrays.stream(declaredFields).map(temp ->{ + return String.valueOf(BeanUtils.getGetterMethodValue(aClass, temp.getName(),vehicleMessage)); + }).collect(Collectors.toList()); + for (int i = 0; i < collect.size(); i++) { + put.addColumn(Bytes.toBytes(columnFamilys),Bytes.toBytes(collect.get(i)),Bytes.toBytes(values.get(i))); + table.put(put); + } + } + + + @Override + public boolean isisTableExists(String tableName) throws IOException { + Admin admin = connection.getAdmin(); + return admin.tableExists(TableName.valueOf(tableName)); + } + + @Override + public void createTable(String tableName, String[] columnFamily) throws IOException { + TableName name = TableName.valueOf(tableName); + //如果存在则删除 + if (admin.tableExists(name)) { + admin.disableTable(name); + admin.deleteTable(name); + log.info("该数据表已被创建,表名:{}",name); + }else { + HTableDescriptor desc = new HTableDescriptor(name); + + for (String cf : columnFamily) { + desc.addFamily(new HColumnDescriptor(cf)); + } + admin.createTable(desc); + } + } }