From ec1616e6d91659ce338cb2b8786e1786f3aa6a84 Mon Sep 17 00:00:00 2001 From: Yang Haoyu <2241399212@qq.com> Date: Wed, 29 Nov 2023 22:16:07 +0800 Subject: [PATCH] =?UTF-8?q?=E6=95=B0=E6=8D=AE=E6=BA=90=20+=20=E8=A7=A3?= =?UTF-8?q?=E6=9E=90=E6=95=B0=E6=8D=AE=E6=95=B4=E5=90=88?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- fate-modules-common/pom.xml | 12 - .../analysis/constants/KafkaConstant.java | 2 +- .../analysis/constants/VehicleConstant.java | 2 +- .../shiyi/analysis/domain/VehicleMessage.java | 621 +++--------------- fate-modules-service/pom.xml | 57 ++ .../shiyi/analysis/AnalysisApplication.java | 2 - .../shiyi/analysis/config/KafkaConsumer.java | 9 +- .../controller/AnalysisController.java | 2 +- .../analysis/handler/MessageHandler.java | 196 +++--- .../handler/MessageHandlerRegular.java | 93 +++ .../analysis/hbase/config/HbaseConfig.java | 2 + .../hbase/instance/HBaseInstance.java | 158 ++++- .../analysis/kafka/config/KafkaConfig.java | 28 + .../analysis/kafka/config/MQTTConfig.java | 41 ++ .../kafka/exception/KafkaException.java | 17 + .../kafka/instance/KafkaInstance.java | 37 ++ .../analysis/kafka/service/KafkaService.java | 20 + .../kafka/service/impl/KafkaServiceimpl.java | 97 +++ .../shiyi/analysis/service/HbaseService.java | 36 +- .../service/impl/HbaseServiceimpl.java | 188 +++++- .../src/main/resources/banner.txt | 2 - .../src/main/resources/bootstrap.yml | 4 +- .../src/main/resources/logback.xml | 74 --- 23 files changed, 964 insertions(+), 736 deletions(-) create mode 100644 fate-modules-service/src/main/java/com/shiyi/analysis/handler/MessageHandlerRegular.java create mode 100644 fate-modules-service/src/main/java/com/shiyi/analysis/kafka/config/KafkaConfig.java create mode 100644 fate-modules-service/src/main/java/com/shiyi/analysis/kafka/config/MQTTConfig.java create mode 100644 fate-modules-service/src/main/java/com/shiyi/analysis/kafka/exception/KafkaException.java create mode 100644 fate-modules-service/src/main/java/com/shiyi/analysis/kafka/instance/KafkaInstance.java create mode 100644 fate-modules-service/src/main/java/com/shiyi/analysis/kafka/service/KafkaService.java create mode 100644 fate-modules-service/src/main/java/com/shiyi/analysis/kafka/service/impl/KafkaServiceimpl.java delete mode 100644 fate-modules-service/src/main/resources/banner.txt delete mode 100644 fate-modules-service/src/main/resources/logback.xml diff --git a/fate-modules-common/pom.xml b/fate-modules-common/pom.xml index 89d4494..42c71f8 100644 --- a/fate-modules-common/pom.xml +++ b/fate-modules-common/pom.xml @@ -61,18 +61,6 @@ fate-common-datascope - - - com.fate - fate-common-log - - - - - com.fate - fate-common-swagger - - diff --git a/fate-modules-common/src/main/java/com/shiyi/analysis/constants/KafkaConstant.java b/fate-modules-common/src/main/java/com/shiyi/analysis/constants/KafkaConstant.java index a672793..652befd 100644 --- a/fate-modules-common/src/main/java/com/shiyi/analysis/constants/KafkaConstant.java +++ b/fate-modules-common/src/main/java/com/shiyi/analysis/constants/KafkaConstant.java @@ -9,5 +9,5 @@ public class KafkaConstant { public static final String ANALYSIS_MESSAGE = "analysis_message"; - public static final String KAFKA_SERVERS = "123.249.90.97:9092"; + public static final String KAFKA_SERVERS = "182.254.222.21:9092"; } diff --git a/fate-modules-common/src/main/java/com/shiyi/analysis/constants/VehicleConstant.java b/fate-modules-common/src/main/java/com/shiyi/analysis/constants/VehicleConstant.java index 0b8df14..1cf2c3b 100644 --- a/fate-modules-common/src/main/java/com/shiyi/analysis/constants/VehicleConstant.java +++ b/fate-modules-common/src/main/java/com/shiyi/analysis/constants/VehicleConstant.java @@ -1,7 +1,7 @@ package com.shiyi.analysis.constants; /** - * @Description : 报文 + * @Description : 报文解析类 * @Author : YangHaoYu * @Date: 2023-11-22 19:35 */ diff --git a/fate-modules-common/src/main/java/com/shiyi/analysis/domain/VehicleMessage.java b/fate-modules-common/src/main/java/com/shiyi/analysis/domain/VehicleMessage.java index 6117639..0722cf3 100644 --- a/fate-modules-common/src/main/java/com/shiyi/analysis/domain/VehicleMessage.java +++ b/fate-modules-common/src/main/java/com/shiyi/analysis/domain/VehicleMessage.java @@ -1,10 +1,16 @@ package com.shiyi.analysis.domain; +import com.baomidou.mybatisplus.annotation.TableField; +import com.baomidou.mybatisplus.annotation.TableId; +import com.baomidou.mybatisplus.annotation.TableName; import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor; import lombok.experimental.SuperBuilder; +import java.math.BigDecimal; +import java.util.Date; + /** * @Description : 车辆实时数据 * @Author : YangHaoYu @@ -14,724 +20,289 @@ import lombok.experimental.SuperBuilder; @SuperBuilder @AllArgsConstructor @NoArgsConstructor +@TableName("vehicle_message") public class VehicleMessage { /** - * 消息标识 + * VIN */ - private String identifier; + @TableId(value = "vin") + private String vin; /** - * VIN码 + * 时间戳 */ - private String vin; + @TableField("create_time") + private Date createTime; /** * 经度 */ + @TableField("longitude") private String longitude; /** - * 纬度 + * 维度 */ + @TableField("latitude") private String latitude; /** - * 车速 + * 速度 */ + @TableField("speed") private String speed; /** * 总里程 */ - private String totalDistance; + @TableField("mileage") + private BigDecimal mileage; /** * 总电压 */ - private String totalVoltage; + @TableField("voltage") + private String voltage; /** * 总电流 */ - private String jointCurrent; + @TableField("current") + private String current; /** * 绝缘电阻 */ - private String insulationResistance; + @TableField("resistance") + private String resistance; /** - * 档位 + * 挡位 */ - private String gears; + @TableField("gear") + private String gear; /** * 加速踏板行程值 */ - private String acceleratorPedal; + @TableField("accelerationPedal") + private String accelerationPedal; /** * 制动踏板行程值 */ + @TableField("brakePedal") private String brakePedal; /** * 燃料消耗率 */ - private String fuelRate; + @TableField("fuelConsumptionRate") + private String fuelConsumptionRate; /** * 电机控制器温度 */ + @TableField("motorControllerTemperature") private String motorControllerTemperature; /** * 电机转速 */ + @TableField("motorSpeed") private String motorSpeed; /** * 电机转矩 */ + @TableField("motorTorque") private String motorTorque; /** * 电机温度 */ + @TableField("motorTemperature") private String motorTemperature; /** * 电机电压 */ + @TableField("motorVoltage") private String motorVoltage; /** * 电机电流 */ + @TableField("motorCurrent") private String motorCurrent; /** * 动力电池剩余电量SOC */ - private String dumpEnergy; + @TableField("remainingBattery") + private BigDecimal remainingBattery; /** * 当前状态允许的最大反馈功率 */ + @TableField("maximumFeedbackPower") private String maximumFeedbackPower; /** * 当前状态允许最大放电功率 */ + @TableField("maximumDischargePower") private String maximumDischargePower; /** * BMS自检计数器 */ - private String selfCheckingCounter; + @TableField("selfCheckCounter") + private String selfCheckCounter; /** * 动力电池充放电电流 */ - private String chargingAndDischargingCurrent; + @TableField("totalBatteryCurrent") + private String totalBatteryCurrent; /** * 动力电池负载端总电压V3 */ - private String totalVoltageAtLoadEnd; + @TableField("totalBatteryVoltage") + private String totalBatteryVoltage; /** * 单次最大电压 */ - private String maximumVoltage; + @TableField("singleBatteryMaxVoltage") + private String singleBatteryMaxVoltage; /** * 单体电池最低电压 */ - private String lowestVoltageBattery; + @TableField("singleBatteryMinVoltage") + private String singleBatteryMinVoltage; /** * 单体电池最高温度 */ - private String maximumTemperatureBattery; + @TableField("singleBatteryMaxTemperature") + private String singleBatteryMaxTemperature; /** * 单体电池最低温度 */ - private String lowestTemperatureBattery; + @TableField("singleBatteryMinTemperature") + private String singleBatteryMinTemperature; /** * 动力电池可用容量 */ + @TableField("availableBatteryCapacity") private String availableBatteryCapacity; /** * 车辆状态 */ - private Integer vehicleState; + @TableField("vehicleStatus") + private int vehicleStatus; /** * 充电状态 */ - private Integer chargingState; + @TableField("chargingStatus") + private int chargingStatus; /** * 运行状态 */ - private Integer runningStatus; + @TableField("operatingStatus") + private int operatingStatus; /** * SOC */ - private Integer socStatus; + @TableField("socStatus") + private int socStatus; /** * 可充电储能装置工作状态 */ - private Integer energyStorageDeviceState; + @TableField("chargingEnergyStorageStatus") + private int chargingEnergyStorageStatus; /** * 驱动电机状态 */ - private Integer driveMotorCondition; + @TableField("driveMotorStatus") + private int driveMotorStatus; /** * 定位是否有效 */ - private Integer positioningState; + @TableField("positionStatus") + private int positionStatus; /** - * EAS 电子防窃系统状态 + * EAS(汽车防盗系统)状态 */ - private Integer easStatus; + @TableField("easStatus") + private int easStatus; /** - * PTC 传动系统状态 + * PTC(电动加热器)状态 */ - private Integer ptcStatus; + @TableField("ptcStatus") + private int ptcStatus; /** - * EPS 蓄电池状态 + * EPS(电动助力系统)状态 */ - private Integer epsStatus; + @TableField("epsStatus") + private int epsStatus; /** - * ABS 防滑刹车系统状态 + * ABS(防抱死)状态 */ - private Integer absStatus; + @TableField("absStatus") + private int absStatus; /** - * MCU 微处理器状态 + * MCU(电机/逆变器)状态 */ - private Integer mcuStatus; + @TableField("mcuStatus") + private int mcuStatus; /** * 动力电池加热状态 */ - private Integer batteryHeatingCondition; + @TableField("heatingStatus") + private int heatingStatus; /** * 动力电池当前状态 */ - private Integer currentBatteryStatus; + @TableField("batteryStatus") + private int batteryStatus; /** * 动力电池保温状态 */ - private Integer batteryInsulationStatus; - + @TableField("batteryInsulationStatus") + private int batteryInsulationStatus; /** * DCDC 电力交换系统状态 */ + @TableField("dcdcStatus") private Integer dcdcStatus; /** * CHG 充电机状态 */ + @TableField("chgStatus") private Integer chgStatus; - /** - *时间 - */ - private Long time; - public Long getTime() { - return time; - } - public void setTime(Object time) { - if(time != null){ - this.time = Long.valueOf(time.toString()); - } - } - - public String getIdentifier() { - return identifier; - } - - public void setIdentifier(Object identifier) { - if(identifier != null) { - this.identifier = (String) identifier; - } - } - - public String getVin() { - return vin; - } - - public void setVin(Object vin) { - if(vin != null) { - this.vin = (String) vin; - } - } - - public String getLongitude() { - return longitude; - } - - public void setLongitude(Object longitude) { - if(longitude != null) { - this.longitude = (String) longitude; - } - } - - public String getLatitude() { - return latitude; - } - - public void setLatitude(Object latitude) { - if(latitude != null) { - this.latitude = (String) latitude; - } - } - - public String getSpeed() { - return speed; - } - - public void setSpeed(Object speed) { - if(speed != null) { - this.speed = (String) speed; - } - } - - public String getTotalDistance() { - return totalDistance; - } - - public void setTotalDistance(Object totalDistance) { - if(totalDistance != null) { - this.totalDistance = (String) totalDistance; - } - } - - public String getTotalVoltage() { - return totalVoltage; - } - - public void setTotalVoltage(Object totalVoltage) { - if(totalVoltage != null) { - this.totalVoltage = (String) totalVoltage; - } - } - - public String getJointCurrent() { - return jointCurrent; - } - - public void setJointCurrent(Object jointCurrent) { - if(jointCurrent != null) { - this.jointCurrent = (String) jointCurrent; - } - } - - public String getInsulationResistance() { - return insulationResistance; - } - - public void setInsulationResistance(Object insulationResistance) { - if(insulationResistance != null) { - this.insulationResistance = (String) insulationResistance; - } - } - - public String getGears() { - return gears; - } - - public void setGears(Object gears) { - if(gears != null) { - this.gears = (String) gears; - } - } - - public String getAcceleratorPedal() { - return acceleratorPedal; - } - - public void setAcceleratorPedal(Object acceleratorPedal) { - if(acceleratorPedal != null) { - this.acceleratorPedal = (String) acceleratorPedal; - } - } - - public String getBrakePedal() { - return brakePedal; - } - - public void setBrakePedal(Object brakePedal) { - if(brakePedal != null) { - this.brakePedal = (String) brakePedal; - } - } - - public String getFuelRate() { - return fuelRate; - } - - public void setFuelRate(Object fuelRate) { - if(fuelRate != null) { - this.fuelRate = (String) fuelRate; - } - } - - public String getMotorControllerTemperature() { - return motorControllerTemperature; - } - - public void setMotorControllerTemperature(Object motorControllerTemperature) { - if(motorControllerTemperature != null) { - this.motorControllerTemperature = (String) motorControllerTemperature; - } - } - - public String getMotorSpeed() { - return motorSpeed; - } - - public void setMotorSpeed(Object motorSpeed) { - if(motorSpeed != null) { - this.motorSpeed = (String) motorSpeed; - } - } - - public String getMotorTorque() { - return motorTorque; - } - - public void setMotorTorque(Object motorTorque) { - if(motorTorque != null) { - this.motorTorque = (String) motorTorque; - } - } - - public String getMotorTemperature() { - return motorTemperature; - } - - public void setMotorTemperature(Object motorTemperature) { - if(motorTemperature != null) { - this.motorTemperature = (String) motorTemperature; - } - } - - public String getMotorVoltage() { - return motorVoltage; - } - - public void setMotorVoltage(Object motorVoltage) { - if(motorVoltage != null) { - this.motorVoltage = (String) motorVoltage; - } - } - - public String getMotorCurrent() { - return motorCurrent; - } - - public void setMotorCurrent(Object motorCurrent) { - if(motorCurrent != null) { - this.motorCurrent = (String) motorCurrent; - } - } - - public String getDumpEnergy() { - return dumpEnergy; - } - - public void setDumpEnergy(Object dumpEnergy) { - if(dumpEnergy != null) { - this.dumpEnergy = (String) dumpEnergy; - } - } - - public String getMaximumFeedbackPower() { - return maximumFeedbackPower; - } - - public void setMaximumFeedbackPower(Object maximumFeedbackPower) { - if(maximumFeedbackPower != null) { - this.maximumFeedbackPower = (String) maximumFeedbackPower; - } - } - - public String getMaximumDischargePower() { - return maximumDischargePower; - } - - public void setMaximumDischargePower(Object maximumDischargePower) { - if(maximumDischargePower != null) { - this.maximumDischargePower = (String) maximumDischargePower; - } - } - - public String getSelfCheckingCounter() { - return selfCheckingCounter; - } - - public void setSelfCheckingCounter(Object selfCheckingCounter) { - if(selfCheckingCounter != null) { - this.selfCheckingCounter = (String) selfCheckingCounter; - } - } - - public String getChargingAndDischargingCurrent() { - return chargingAndDischargingCurrent; - } - - public void setChargingAndDischargingCurrent(Object chargingAndDischargingCurrent) { - if(chargingAndDischargingCurrent != null) { - this.chargingAndDischargingCurrent = (String) chargingAndDischargingCurrent; - } - } - - public String getTotalVoltageAtLoadEnd() { - return totalVoltageAtLoadEnd; - } - - public void setTotalVoltageAtLoadEnd(Object totalVoltageAtLoadEnd) { - if(totalVoltageAtLoadEnd != null) { - this.totalVoltageAtLoadEnd = (String) totalVoltageAtLoadEnd; - } - } - - public String getMaximumVoltage() { - return maximumVoltage; - } - - public void setMaximumVoltage(Object maximumVoltage) { - if(maximumVoltage != null) { - this.maximumVoltage = (String) maximumVoltage; - } - } - - public String getLowestVoltageBattery() { - return lowestVoltageBattery; - } - - public void setLowestVoltageBattery(Object lowestVoltageBattery) { - if(lowestVoltageBattery != null) { - this.lowestVoltageBattery = (String) lowestVoltageBattery; - } - } - - public String getMaximumTemperatureBattery() { - return maximumTemperatureBattery; - } - - public void setMaximumTemperatureBattery(Object maximumTemperatureBattery) { - if(maximumTemperatureBattery != null) { - this.maximumTemperatureBattery = (String) maximumTemperatureBattery; - } - } - - public String getLowestTemperatureBattery() { - return lowestTemperatureBattery; - } - - public void setLowestTemperatureBattery(Object lowestTemperatureBattery) { - if(lowestTemperatureBattery != null) { - this.lowestTemperatureBattery = (String) lowestTemperatureBattery; - } - } - - public String getAvailableBatteryCapacity() { - return availableBatteryCapacity; - } - - public void setAvailableBatteryCapacity(Object availableBatteryCapacity) { - if(availableBatteryCapacity != null) { - this.availableBatteryCapacity = (String) availableBatteryCapacity; - } - } - - public Integer getVehicleState() { - return vehicleState; - } - - public void setVehicleState(Object vehicleState) { - if(vehicleState != null) { - this.vehicleState = Integer.valueOf(vehicleState.toString()); - } - } - - public Integer getChargingState() { - return chargingState; - } - - public void setChargingState(Object chargingState) { - if(chargingState != null) { - this.chargingState = Integer.valueOf(chargingState.toString()); - } - } - - public Integer getRunningStatus() { - return runningStatus; - } - - public void setRunningStatus(Object runningStatus) { - if(runningStatus != null){ - this.runningStatus = Integer.valueOf(runningStatus.toString()); - } - } - - public Integer getSocStatus() { - return socStatus; - } - - public void setSocStatus(Object socStatus) { - if(socStatus != null) { - this.socStatus = Integer.valueOf(socStatus.toString()); - } - } - - public Integer getEnergyStorageDeviceState() { - return energyStorageDeviceState; - } - - public void setEnergyStorageDeviceState(Object energyStorageDeviceState) { - if(energyStorageDeviceState != null){ - this.energyStorageDeviceState = Integer.valueOf(energyStorageDeviceState.toString()); - } - } - - public Integer getDriveMotorCondition() { - return driveMotorCondition; - } - - public void setDriveMotorCondition(Object driveMotorCondition) { - if(driveMotorCondition != null) { - this.driveMotorCondition = Integer.valueOf(driveMotorCondition.toString()); - } - } - - public Integer getPositioningState() { - return positioningState; - } - - public void setPositioningState(Object positioningState) { - if(positioningState != null) { - this.positioningState = Integer.valueOf(positioningState.toString()); - } - } - - public Integer getEasStatus() { - return easStatus; - } - - public void setEasStatus(Object easStatus) { - if(easStatus != null) { - this.easStatus = Integer.valueOf(easStatus.toString()); - } - } - - public Integer getPtcStatus() { - return ptcStatus; - } - - public void setPtcStatus(Object ptcStatus) { - if(ptcStatus != null) { - this.ptcStatus = Integer.valueOf(ptcStatus.toString()); - } - } - - public Integer getEpsStatus() { - return epsStatus; - } - - public void setEpsStatus(Object epsStatus) { - if(epsStatus != null) { - this.epsStatus = Integer.valueOf(epsStatus.toString()); - } - } - - public Integer getAbsStatus() { - return absStatus; - } - - public void setAbsStatus(Object absStatus) { - if(absStatus != null) { - this.absStatus = Integer.valueOf(absStatus.toString()); - } - } - - public Integer getMcuStatus() { - return mcuStatus; - } - - public void setMcuStatus(Object mcuStatus) { - if(mcuStatus != null) { - this.mcuStatus = Integer.valueOf(mcuStatus.toString()); - } - } - - public Integer getBatteryHeatingCondition() { - return batteryHeatingCondition; - } - - public void setBatteryHeatingCondition(Object batteryHeatingCondition) { - if(batteryHeatingCondition != null) { - this.batteryHeatingCondition = Integer.valueOf(batteryHeatingCondition.toString()); - } - } - - public Integer getCurrentBatteryStatus() { - return currentBatteryStatus; - } - - public void setCurrentBatteryStatus(Object currentBatteryStatus) { - if(currentBatteryStatus != null){ - this.currentBatteryStatus = Integer.valueOf(currentBatteryStatus.toString()); - } - } - - public Integer getBatteryInsulationStatus() { - return batteryInsulationStatus; - } - - public void setBatteryInsulationStatus(Object batteryInsulationStatus) { - if(batteryInsulationStatus != null){ - this.batteryInsulationStatus = Integer.valueOf(batteryInsulationStatus.toString()); - } - } - - public Integer getDcdcStatus() { - return dcdcStatus; - } - - public void setDcdcStatus(Object dcdcStatus) { - if(dcdcStatus != null) { - this.dcdcStatus = Integer.valueOf(dcdcStatus.toString()); - } - } - - public Integer getChgStatus() { - return chgStatus; - } - - public void setChgStatus(Object chgStatus) { - if(chgStatus != null) { - this.chgStatus = Integer.valueOf(chgStatus.toString()); - } - } } diff --git a/fate-modules-service/pom.xml b/fate-modules-service/pom.xml index 7a61f2b..73cfdf6 100644 --- a/fate-modules-service/pom.xml +++ b/fate-modules-service/pom.xml @@ -36,6 +36,50 @@ hbase-client 2.4.7 + + org.hsqldb + hsqldb + 2.5.0 + + + org.apache.hadoop + hadoop-common + 3.3.1 + + + org.apache.hadoop + hadoop-auth + 3.3.1 + + + org.apache.hadoop + hadoop-hdfs + 3.3.1 + + + org.apache.hbase + hbase-client + 1.4.7 + + + org.apache.hbase + hbase-common + 1.4.7 + + + org.apache.hbase + hbase-server + 1.4.7 + + + + + + + mysql + mysql-connector-java + 8.0.28 + @@ -81,6 +125,11 @@ + + + org.springframework.integration + spring-integration-mqtt + @@ -101,6 +150,14 @@ + + org.apache.maven.plugins + maven-compiler-plugin + + 15 + 15 + + diff --git a/fate-modules-service/src/main/java/com/shiyi/analysis/AnalysisApplication.java b/fate-modules-service/src/main/java/com/shiyi/analysis/AnalysisApplication.java index ba0fc27..0e531e0 100644 --- a/fate-modules-service/src/main/java/com/shiyi/analysis/AnalysisApplication.java +++ b/fate-modules-service/src/main/java/com/shiyi/analysis/AnalysisApplication.java @@ -2,7 +2,6 @@ package com.shiyi.analysis; import com.fate.common.security.annotation.EnableCustomConfig; import com.fate.common.security.annotation.EnableMyFeignClients; -import com.fate.common.swagger.annotation.EnableCustomSwagger2; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; @@ -12,7 +11,6 @@ import org.springframework.boot.autoconfigure.SpringBootApplication; * @Date: 2023-11-20 22:16 */ @EnableCustomConfig -@EnableCustomSwagger2 @EnableMyFeignClients @SpringBootApplication public class AnalysisApplication { diff --git a/fate-modules-service/src/main/java/com/shiyi/analysis/config/KafkaConsumer.java b/fate-modules-service/src/main/java/com/shiyi/analysis/config/KafkaConsumer.java index 788e7f4..6e40991 100644 --- a/fate-modules-service/src/main/java/com/shiyi/analysis/config/KafkaConsumer.java +++ b/fate-modules-service/src/main/java/com/shiyi/analysis/config/KafkaConsumer.java @@ -61,7 +61,7 @@ public class KafkaConsumer { public void OnMmessage(ConsumerRecord consumerRecord){ //提交到线程池里 alanAnalysisThreadPool.submit(() ->{ - + messageHandler(consumerRecord); }); } @@ -88,10 +88,10 @@ public class KafkaConsumer { // 车辆围栏处理 rabbitTemplate.convertSendAndReceive(RabbitConstants.FENCE_HANDLER_QUEUE, JSON.toJSONString(vehicleMessage)); try { - String tableName = HBaseConstant.HBASE_TABLE_PREFIX + tableNamemat.format(new Date(vehicleMessage.getTime())); + String tableName = HBaseConstant.HBASE_TABLE_PREFIX + tableNamemat.format(vehicleMessage.getCreateTime()); log.info("信息入库;database:{},vin:{},time:{}",tableName,vehicleMessage.getVin(),System.currentTimeMillis()); // 将信息添加到hbase - hbaseService.insertRecordsForObj(tableName,vehicleMessage.getVin()+ ":" +vehicleMessage.getTime(),"info",vehicleMessage); + hbaseService.insertRecordsForObj(tableName,vehicleMessage.getVin()+ ":" +vehicleMessage.getCreateTime(),"info",vehicleMessage); }catch (Exception e){ log.info("添加hbase出错:{}",e.getMessage()); } @@ -99,7 +99,4 @@ public class KafkaConsumer { } - - - } diff --git a/fate-modules-service/src/main/java/com/shiyi/analysis/controller/AnalysisController.java b/fate-modules-service/src/main/java/com/shiyi/analysis/controller/AnalysisController.java index 32d0f5d..606718f 100644 --- a/fate-modules-service/src/main/java/com/shiyi/analysis/controller/AnalysisController.java +++ b/fate-modules-service/src/main/java/com/shiyi/analysis/controller/AnalysisController.java @@ -38,7 +38,7 @@ public class AnalysisController { @SneakyThrows @PostMapping("trackHistory") public Result> trackHistory(@RequestBody DrivingRecord drivingRecord){ - log.info("历史查询 ——开始时间:【{}】,结束时间:【{}】",new Date(Long.parseLong(drivingRecord.getStartKey())).toLocaleString(),new Date(drivingRecord.getEndKey()).toLocaleString()); + log.info("查询 ——开始时间:【{}】,结束时间:【{}】",new Date(Long.parseLong(drivingRecord.getStartKey())).toLocaleString(),new Date(drivingRecord.getEndKey()).toLocaleString()); //根据输入输出时间格式化日期 String startDay = targetDay.format(new Date(Long.parseLong(drivingRecord.getStartKey()))); String endDay = targetDay.format(new Date(Long.parseLong(drivingRecord.getEndKey()))); diff --git a/fate-modules-service/src/main/java/com/shiyi/analysis/handler/MessageHandler.java b/fate-modules-service/src/main/java/com/shiyi/analysis/handler/MessageHandler.java index d252218..baba3b4 100644 --- a/fate-modules-service/src/main/java/com/shiyi/analysis/handler/MessageHandler.java +++ b/fate-modules-service/src/main/java/com/shiyi/analysis/handler/MessageHandler.java @@ -3,8 +3,11 @@ package com.shiyi.analysis.handler; import com.shiyi.analysis.domain.VehicleMessage; import lombok.extern.log4j.Log4j2; +import java.math.BigDecimal; +import java.sql.Date; + /** - * @Description : 报文处理类 + * @Description : 报文切割处理类 * @Author : YangHaoYu * @Date: 2023-11-22 20:05 */ @@ -12,103 +15,106 @@ import lombok.extern.log4j.Log4j2; public class MessageHandler { public static VehicleMessage messageTranslated(String content){ + char start = content.charAt(0); + char end = content.charAt(content.length() - 1); + System.out.println(start); + System.out.println(end); VehicleMessage vehicleMessage = new VehicleMessage(); - //消息标识 - vehicleMessage.setIdentifier(content.substring(0,12)); //vin码 - vehicleMessage.setVin(content.substring(12,29)); - //经度 - vehicleMessage.setLongitude(content.substring(29,40)); - //纬度 - vehicleMessage.setLatitude(content.substring(40,50)); - //车速 - vehicleMessage.setSpeed(content.substring(50,56)); - //总里程 - vehicleMessage.setTotalDistance(content.substring(56,67)); - //总电压 - vehicleMessage.setTotalVoltage(content.substring(67,73)); - //总电流 - vehicleMessage.setJointCurrent(content.substring(73,78)); - //绝缘电阻 - vehicleMessage.setInsulationResistance(content.substring(78,87)); - //档位 - vehicleMessage.setGears(content.substring(87,88)); - //加速踏板行程值 - vehicleMessage.setAcceleratorPedal(content.substring(88,90)); - //制动踏板行程值 - vehicleMessage.setBrakePedal(content.substring(90,92)); - //燃料消耗率 - vehicleMessage.setFuelRate(content.substring(92,98)); - //电机控制器温度 - vehicleMessage.setMotorControllerTemperature(content.substring(98,103)); - //电机转速 - vehicleMessage.setMotorSpeed(content.substring(103,108)); - //电机转矩 - vehicleMessage.setMotorTorque(content.substring(108,112)); - //电机温度 - vehicleMessage.setMotorTemperature(content.substring(112,118)); - //电机电压 - vehicleMessage.setMotorVoltage(content.substring(118,123)); - //电机电流 - vehicleMessage.setMotorCurrent(content.substring(123,132)); - //动力电池剩余电量SOC - vehicleMessage.setDumpEnergy(content.substring(132,137)); - //当前状态允许的最大反馈功率 - vehicleMessage.setMaximumFeedbackPower(content.substring(137,143)); - //当前状态允许最大放电功率 - vehicleMessage.setMaximumDischargePower(content.substring(143,149)); - //BMS自检计数器 - vehicleMessage.setSelfCheckingCounter(content.substring(149,151)); - //动力电池充放电电流 - vehicleMessage.setChargingAndDischargingCurrent(content.substring(151,156)); - //动力电池负载端总电压V3 - vehicleMessage.setTotalVoltageAtLoadEnd(content.substring(156,162)); - //单次最大电压 - vehicleMessage.setMaximumVoltage(content.substring(162,166)); - //单体电池最低电压 - vehicleMessage.setLowestVoltageBattery(content.substring(166,170)); - //单体电池最高温度 - vehicleMessage.setMaximumTemperatureBattery(content.substring(170,176)); - //单体电池最低温度 - vehicleMessage.setLowestTemperatureBattery(content.substring(176,182)); - //动力电池可用容量 - vehicleMessage.setAvailableBatteryCapacity(content.substring(182,188)); - //车辆状态 - vehicleMessage.setVehicleState(Integer.valueOf(content.substring(188,189))); - //充电状态 - vehicleMessage.setChargingState(Integer.valueOf(content.substring(189,190))); - //运行状态 - vehicleMessage.setRunningStatus(Integer.valueOf(content.substring(190,191))); - //Soc状态 - vehicleMessage.setSocStatus(Integer.valueOf(content.substring(191,192))); - //可充电储能装置工作状态 - vehicleMessage.setEnergyStorageDeviceState(Integer.valueOf(content.substring(192,193))); - //驱动电机状态 - vehicleMessage.setDriveMotorCondition(Integer.valueOf(content.substring(193,194))); - //定位是否有效 - vehicleMessage.setPositioningState(Integer.valueOf(content.substring(194,195))); - //EAS 电子防窃系统状态 - vehicleMessage.setEasStatus(Integer.valueOf(content.substring(195,196))); - //PTC 传动系统状态 - vehicleMessage.setPtcStatus(Integer.valueOf(content.substring(196,197))); - //EPS 蓄电池状态 - vehicleMessage.setEpsStatus(Integer.valueOf(content.substring(197,198))); - //ABS 防滑刹车系统状态 - vehicleMessage.setAbsStatus(Integer.valueOf(content.substring(198,199))); - //MCU 微处理器状态 - vehicleMessage.setMcuStatus(Integer.valueOf(content.substring(199,200))); - //动力电池加热状态 - vehicleMessage.setBatteryHeatingCondition(Integer.valueOf(content.substring(200,201))); - //动力电池当前状态 - vehicleMessage.setCurrentBatteryStatus(Integer.valueOf(content.substring(201,202))); - //动力电池保温状态 - vehicleMessage.setBatteryInsulationStatus(Integer.valueOf(content.substring(202,203))); - //DCDC 电力交换系统状态 - vehicleMessage.setDcdcStatus(Integer.valueOf(content.substring(203,204))); - //CHG 充电机状态 - vehicleMessage.setChgStatus(Integer.valueOf(content.substring(204,205))); + vehicleMessage.setVin(content.substring(2,18)); //截取从车辆接受端获取端时间戳 - vehicleMessage.setTime(Long.valueOf(content.substring(205,218))); + vehicleMessage.setCreateTime(Date.valueOf(content.substring(18,31))); + //经度 + vehicleMessage.setLongitude(content.substring(31,42)); + //纬度 + vehicleMessage.setLatitude(content.substring(42,52)); + //车速 + vehicleMessage.setSpeed(content.substring(52,58)); + //总里程 + vehicleMessage.setMileage(new BigDecimal(content.substring(58,69))); + //总电压 + vehicleMessage.setVoltage(content.substring(69,75)); + //总电流 + vehicleMessage.setCurrent(content.substring(75,80)); + //绝缘电阻 + vehicleMessage.setResistance(content.substring(80,89)); + //档位 + vehicleMessage.setGear(content.substring(89,90)); + //加速踏板行程值 + vehicleMessage.setAccelerationPedal(content.substring(90,92)); + //制动踏板行程值 + vehicleMessage.setBrakePedal(content.substring(92,94)); + //燃料消耗率 + vehicleMessage.setFuelConsumptionRate(content.substring(94,99)); + //电机控制器温度 + vehicleMessage.setMotorControllerTemperature(content.substring(99,105)); + //电机转速 + vehicleMessage.setMotorSpeed(content.substring(105,110)); + //电机转矩 + vehicleMessage.setMotorTorque(content.substring(110,114)); + //电机温度 + vehicleMessage.setMotorTemperature(content.substring(114,120)); + //电机电压 + vehicleMessage.setMotorVoltage(content.substring(120,125)); + //电机电流 + vehicleMessage.setMotorCurrent(content.substring(125,133)); + //动力电池剩余电量SOC + vehicleMessage.setRemainingBattery(new BigDecimal(content.substring(133,139))); + //当前状态允许的最大反馈功率 + vehicleMessage.setMaximumFeedbackPower(content.substring(139,145)); + //当前状态允许最大放电功率 + vehicleMessage.setMaximumDischargePower(content.substring(145,151)); + //BMS自检计数器 + vehicleMessage.setSelfCheckCounter(content.substring(151,153)); + //动力电池充放电电流 + vehicleMessage.setTotalBatteryCurrent(content.substring(153,158)); + //动力电池负载端总电压V3 + vehicleMessage.setTotalBatteryVoltage(content.substring(158,164)); + //单次最大电压 + vehicleMessage.setSingleBatteryMaxVoltage(content.substring(164,168)); + //单体电池最低电压 + vehicleMessage.setSingleBatteryMinVoltage(content.substring(168,172)); + //单体电池最高温度 + vehicleMessage.setSingleBatteryMaxTemperature(content.substring(172,178)); + //单体电池最低温度 + vehicleMessage.setSingleBatteryMinTemperature(content.substring(178,184)); + //动力电池可用容量 + vehicleMessage.setAvailableBatteryCapacity(content.substring(184,190)); + //车辆状态 + vehicleMessage.setVehicleStatus(Integer.valueOf(content.substring(190,191))); + //充电状态 + vehicleMessage.setChargingStatus(Integer.valueOf(content.substring(191,192))); + //运行状态 + vehicleMessage.setOperatingStatus(Integer.valueOf(content.substring(192,193))); + //Soc状态 + vehicleMessage.setSocStatus(Integer.valueOf(content.substring(193,194))); //Soc状态 + //可充电储能装置工作状态 + vehicleMessage.setChargingEnergyStorageStatus(Integer.valueOf(content.substring(194,195))); + //驱动电机状态 + vehicleMessage.setDriveMotorStatus(Integer.valueOf(content.substring(195,196))); + //定位是否有效 + vehicleMessage.setPositionStatus(Integer.valueOf(content.substring(196,197))); + //EAS 电子防窃系统状态 + vehicleMessage.setEasStatus(Integer.valueOf(content.substring(197,198))); + //PTC 传动系统状态 + vehicleMessage.setPtcStatus(Integer.valueOf(content.substring(198,199))); + //EPS 蓄电池状态 + vehicleMessage.setEpsStatus(Integer.valueOf(content.substring(199,200))); + //ABS 防滑刹车系统状态 + vehicleMessage.setAbsStatus(Integer.valueOf(content.substring(200,201))); + //MCU 微处理器状态 + vehicleMessage.setMcuStatus(Integer.valueOf(content.substring(201,202))); + //动力电池加热状态 + vehicleMessage.setHeatingStatus(Integer.valueOf(content.substring(202,203))); + //动力电池当前状态 + vehicleMessage.setBatteryStatus(Integer.valueOf(content.substring(203,204))); + //动力电池保温状态 + vehicleMessage.setBatteryInsulationStatus(Integer.valueOf(content.substring(204,205))); + //DCDC 电力交换系统状态 + vehicleMessage.setDcdcStatus(Integer.valueOf(content.substring(205,206))); + //CHG 充电机状态 + vehicleMessage.setChgStatus(Integer.valueOf(content.substring(207,208))); + return vehicleMessage; } diff --git a/fate-modules-service/src/main/java/com/shiyi/analysis/handler/MessageHandlerRegular.java b/fate-modules-service/src/main/java/com/shiyi/analysis/handler/MessageHandlerRegular.java new file mode 100644 index 0000000..3dbcc4f --- /dev/null +++ b/fate-modules-service/src/main/java/com/shiyi/analysis/handler/MessageHandlerRegular.java @@ -0,0 +1,93 @@ +package com.shiyi.analysis.handler; + +import com.shiyi.analysis.constants.VehicleConstant; +import lombok.extern.log4j.Log4j2; +import org.apache.commons.lang3.StringUtils; + +/** + * @Description : 报文处理规则 + * @Author : YangHaoYu + * @Date: 2023-11-27 09:33 + */ +@Log4j2 +public class MessageHandlerRegular { + + /** + * 校验位计算 + * @param message + * @return + */ + public static boolean checkHandlerRule(String message) { + if (!message.contains(VehicleConstant.DATA_PACK_SEPARATOR)){ + return false; + } + String temp = message.replaceAll("#\\$&\\*", ""); + if (message.indexOf(VehicleConstant.MSG_START) != 0){ + return false; + } + if ((message.lastIndexOf(VehicleConstant.MSG_END)+2) != temp.length()){ + return false; + } + String content = message.substring(2, message.length() - 9); + String data = content.replace(" ", ""); + int dSum = 0; + int length = data.length(); + int index = 0; + //遍历十六进制,并计算总合 + while (index < length){ + //截取2位字符 + String hex = data.substring(index, index + 2); + //十六进制转成十进制 , 并计算十进制的总和 + int i = Integer.parseInt(hex, 16); + dSum += i; + index += 2; + } + //用256取余,十六进制最大是FF,FF的十进制是255 + int mod = dSum % 256; + //余数转成十六进制 + String checkSumHex = Integer.toHexString(mod); + length = checkSumHex.length(); + if (length < 2){ + //校验位不足两位的,在前面补0 + checkSumHex ="0" +checkSumHex; + } + String parityBit = message.substring(message.length() - 9, message.length() - 7); + return parityBit.equals(checkSumHex); + } + + + /** + * 将十六进制转为字符 + * @param hexString + * @return + */ + public static String theDecimal(String hexString){ + String[] split = hexString.split(" "); + StringBuilder stringBuilder = new StringBuilder(); + for (String info : split) { + if (StringUtils.isNotEmpty(info)){ + String decimalTemp = valueToAscii(Integer.valueOf(info, 16).toString()); + stringBuilder.append(decimalTemp); + } + } + return stringBuilder.toString(); + } + + + /** + *将10进制数据转为字符 + * @param value + * @return + */ + public static String valueToAscii(String value){ + StringBuilder builder = new StringBuilder(); + String[] chars = value.split(","); + for (String info : chars) { + builder.append((char)Integer.parseInt(info)); + } + return builder.toString(); + } + + + +} diff --git a/fate-modules-service/src/main/java/com/shiyi/analysis/hbase/config/HbaseConfig.java b/fate-modules-service/src/main/java/com/shiyi/analysis/hbase/config/HbaseConfig.java index 2e79edd..ef8c304 100644 --- a/fate-modules-service/src/main/java/com/shiyi/analysis/hbase/config/HbaseConfig.java +++ b/fate-modules-service/src/main/java/com/shiyi/analysis/hbase/config/HbaseConfig.java @@ -29,4 +29,6 @@ public class HbaseConfig { public void setConfMaps(Map confMaps) { this.confMaps = confMaps; } + + } diff --git a/fate-modules-service/src/main/java/com/shiyi/analysis/hbase/instance/HBaseInstance.java b/fate-modules-service/src/main/java/com/shiyi/analysis/hbase/instance/HBaseInstance.java index 8387ea9..8f63a6f 100644 --- a/fate-modules-service/src/main/java/com/shiyi/analysis/hbase/instance/HBaseInstance.java +++ b/fate-modules-service/src/main/java/com/shiyi/analysis/hbase/instance/HBaseInstance.java @@ -1,15 +1,23 @@ package com.shiyi.analysis.hbase.instance; +import com.fate.common.datasource.annotation.Slave; +import com.shiyi.analysis.domain.VehicleMessage; import com.shiyi.analysis.hbase.config.HbaseConfig; import lombok.extern.log4j.Log4j2; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.client.Admin; -import org.apache.hadoop.hbase.client.Connection; +//import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.kafka.clients.consumer.ConsumerRecord; import org.springframework.context.annotation.Bean; +import org.springframework.jdbc.datasource.embedded.EmbeddedDatabase; +import org.springframework.jdbc.datasource.embedded.EmbeddedDatabaseBuilder; +import org.springframework.kafka.annotation.KafkaListener; import org.springframework.stereotype.Component; +import java.io.IOException; +import java.sql.*; import java.util.Map; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -21,10 +29,141 @@ import java.util.concurrent.Executors; */ @Log4j2 @Component +@Slave //TIDB数据源 public class HBaseInstance { private ExecutorService pool = Executors.newScheduledThreadPool(20); + /** + * 定义TIDB连接信息 + */ + private static final String TIDBURL = ""; + + private static final String USERNAME = "root"; + + private static final String PASSWORD = "123456"; + + + @KafkaListener (topics = "mqtt", groupId = "Fluxmq_consumer") + public void listen(ConsumerRecord record){ + String message = new String(record.value()); + StringBuilder builder = new StringBuilder(); + String[] split = message.split(" "); + for (String s : split) { + int sh = Integer.parseInt(s, 16); + builder.append((char) sh); + } + VehicleMessage build = VehicleMessage.builder().build(); + try (Connection connection = DriverManager.getConnection(TIDBURL, USERNAME, PASSWORD)){ + String sql = """ + INSERT INTO vehicle_message ( + vin, + createTime, + longitude, + latitude, + speed, + mileage, + voltage, + current, + resistance, + gear, + accelerationPedal, + brakePedal, + fuelConsumptionRate, + motorControllerTemperature, + motorSpeed, + motorTorque, + motorTemperature, + motorVoltage, + motorCurrent, + remainingBattery, + maximumFeedbackPower, + maximumDischargePower, + selfCheckCounter, + totalBatteryCurrent, + totalBatteryVoltage, + singleBatteryMaxVoltage, + singleBatteryMinVoltage, + singleBatteryMaxTemperature, + singleBatteryMinTemperature, + availableBatteryCapacity, + vehicleStatus, + chargingStatus, + operatingStatus, + socStatus, + chargingEnergyStorageStatus, + driveMotorStatus, + positionStatus, + easStatus, + ptcStatus, + epsStatus, + absStatus, + mcuStatus, + heatingStatus, + batteryStatus, + batteryInsulationStatus, + dcdcStatus, + chgStatus) VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)""" ; + try (PreparedStatement preparedStatement = connection.prepareStatement (sql)) { + // 设置参数 + preparedStatement.setString (1, build.getVin ()); + preparedStatement.setDate (2, (Date) build.getCreateTime()); + preparedStatement.setString (3, build.getLongitude ()); + preparedStatement.setString (4, build.getLatitude ()); + preparedStatement.setString (5, build.getSpeed ()); + preparedStatement.setBigDecimal (6, build.getMileage ()); + preparedStatement.setString (7, build.getVoltage ()); + preparedStatement.setString (8, build.getCurrent ()); + preparedStatement.setString (9, build.getResistance ()); + preparedStatement.setString (10, build.getGear ()); + preparedStatement.setString (11, build.getAccelerationPedal ()); + preparedStatement.setString (12, build.getBrakePedal ()); + preparedStatement.setString (13, build.getFuelConsumptionRate ()); + preparedStatement.setString (14, build.getMotorControllerTemperature ()); + preparedStatement.setString (15, build.getMotorSpeed ()); + preparedStatement.setString (16, build.getMotorTorque ()); + preparedStatement.setString (17, build.getMotorTemperature ()); + preparedStatement.setString (18, build.getMotorVoltage ()); + preparedStatement.setString (19, build.getMotorCurrent ()); + preparedStatement.setBigDecimal (20, build.getRemainingBattery ()); + preparedStatement.setString (21, build.getMaximumFeedbackPower ()); + preparedStatement.setString (22, build.getMaximumDischargePower ()); + preparedStatement.setString (23, build.getSelfCheckCounter ()); + preparedStatement.setString (24, build.getTotalBatteryCurrent ()); + preparedStatement.setString (25, build.getTotalBatteryVoltage ()); + preparedStatement.setString (26, build.getSingleBatteryMaxVoltage ()); + preparedStatement.setString (27, build.getSingleBatteryMinVoltage ()); + preparedStatement.setString (28, build.getSingleBatteryMaxTemperature ()); + preparedStatement.setString (29, build.getSingleBatteryMinTemperature ()); + preparedStatement.setString (30, build.getAvailableBatteryCapacity ()); + preparedStatement.setInt (31, build.getVehicleStatus ()); + preparedStatement.setInt (32, build.getChargingStatus ()); + preparedStatement.setInt (33, build.getOperatingStatus ()); + preparedStatement.setInt (34, build.getSocStatus ()); + preparedStatement.setInt (35, build.getChargingEnergyStorageStatus ()); + preparedStatement.setInt (36, build.getDriveMotorStatus ()); + preparedStatement.setInt (37, build.getPositionStatus ()); + preparedStatement.setInt (38, build.getEasStatus ()); + preparedStatement.setInt (39, build.getPtcStatus ()); + preparedStatement.setInt (40, build.getEpsStatus ()); + preparedStatement.setInt (41, build.getAbsStatus ()); + preparedStatement.setInt (42, build.getMcuStatus ()); + preparedStatement.setInt (43, build.getHeatingStatus ()); + preparedStatement.setInt (44, build.getBatteryStatus ()); + preparedStatement.setInt (45, build.getBatteryInsulationStatus ()); + preparedStatement.setInt(46,build.getDcdcStatus()); + preparedStatement.setInt(47,build.getChgStatus()); + + // 执行插入操作 + preparedStatement.executeUpdate (); + } + }catch (SQLException e){ + System.out.println(e.getMessage()); + } + } + + + @Bean(name = "hbaseConnection") public Connection initConnection(HbaseConfig hbaseConfig){ try { @@ -34,21 +173,34 @@ public class HBaseInstance { for (Map.Entry confEntry : confMaps.entrySet()) { conf.set(confEntry.getKey(),confEntry.getValue()); } - return ConnectionFactory.createConnection(conf,pool); + return null; + //return ConnectionFactory.createConnection(conf,pool); }catch (Exception e){ log.error("连接HBase错误", e); return null; } } + @Bean + public Connection connection() throws SQLException { + EmbeddedDatabase embeddedDatabase = new EmbeddedDatabaseBuilder().build(); + return embeddedDatabase.getConnection(); + } @Bean(name = "hbaseAdmin") public Admin initAdmin(Connection connection){ try { - return connection.getAdmin(); + return null; + //return connection.getAdmin(); }catch (Exception e){ log.error("创建HBase管理API出错",e); return null; } } + @Bean + public Admin admin() throws IOException { + Configuration config = new Configuration(); + config.set("hbase.zookeeper.quorum", "localhost"); + return ConnectionFactory.createConnection(config).getAdmin(); + } } diff --git a/fate-modules-service/src/main/java/com/shiyi/analysis/kafka/config/KafkaConfig.java b/fate-modules-service/src/main/java/com/shiyi/analysis/kafka/config/KafkaConfig.java new file mode 100644 index 0000000..52765da --- /dev/null +++ b/fate-modules-service/src/main/java/com/shiyi/analysis/kafka/config/KafkaConfig.java @@ -0,0 +1,28 @@ +package com.shiyi.analysis.kafka.config; + +import lombok.Data; +import org.springframework.boot.context.properties.ConfigurationProperties; +import org.springframework.context.annotation.Configuration; + +/** + * @Description : kafka服务配置 + * @Author : YangHaoYu + * @Date: 2023-11-27 19:37 + */ +@ConfigurationProperties(prefix = "kafka.config") +@Configuration +@Data +public class KafkaConfig { + + /** + * 服务地址 + */ + private String hosts; + + /** + *topic + */ + private String topic; + + +} diff --git a/fate-modules-service/src/main/java/com/shiyi/analysis/kafka/config/MQTTConfig.java b/fate-modules-service/src/main/java/com/shiyi/analysis/kafka/config/MQTTConfig.java new file mode 100644 index 0000000..e2dca6e --- /dev/null +++ b/fate-modules-service/src/main/java/com/shiyi/analysis/kafka/config/MQTTConfig.java @@ -0,0 +1,41 @@ +package com.shiyi.analysis.kafka.config; + +import org.eclipse.paho.client.mqttv3.MqttClient; +import org.eclipse.paho.client.mqttv3.MqttConnectOptions; +import org.eclipse.paho.client.mqttv3.MqttException; +import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +/** + * @Description : MQTT连接 + * @Author : YangHaoYu + * @Date: 2023-11-28 16:03 + */ +@Configuration +public class MQTTConfig { + + /** + *MQTT代理服务器地址和端口 + */ + private static final String BROKER_URL = "tcp://182.254.222.21:1883"; + + + /** + *MQTT客户端ID + */ + private static final String CLIENT_ID = "analysis"; + + + @Bean + public MqttClient mqttClient() throws MqttException { + MqttClient mqttClient = new MqttClient(BROKER_URL, CLIENT_ID, new MemoryPersistence()); + MqttConnectOptions options = new MqttConnectOptions(); + options.setCleanSession(true); + mqttClient.connect(options); + System.out.println("mqtt开始连接"); + return mqttClient; + } + + +} diff --git a/fate-modules-service/src/main/java/com/shiyi/analysis/kafka/exception/KafkaException.java b/fate-modules-service/src/main/java/com/shiyi/analysis/kafka/exception/KafkaException.java new file mode 100644 index 0000000..366034a --- /dev/null +++ b/fate-modules-service/src/main/java/com/shiyi/analysis/kafka/exception/KafkaException.java @@ -0,0 +1,17 @@ +package com.shiyi.analysis.kafka.exception; + +/** + * @Description : kafka异常类 + * @Author : YangHaoYu + * @Date: 2023-11-27 19:41 + */ +public class KafkaException extends RuntimeException { + + public KafkaException(String message){ + super(message); + } + + public KafkaException(String message, Throwable cause){ + super(message, cause); + } +} diff --git a/fate-modules-service/src/main/java/com/shiyi/analysis/kafka/instance/KafkaInstance.java b/fate-modules-service/src/main/java/com/shiyi/analysis/kafka/instance/KafkaInstance.java new file mode 100644 index 0000000..79dc399 --- /dev/null +++ b/fate-modules-service/src/main/java/com/shiyi/analysis/kafka/instance/KafkaInstance.java @@ -0,0 +1,37 @@ +package com.shiyi.analysis.kafka.instance; + +import com.fasterxml.jackson.databind.ser.std.StringSerializer; +import com.shiyi.analysis.kafka.config.KafkaConfig; +import lombok.extern.log4j.Log4j2; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.springframework.context.annotation.Bean; +import org.springframework.stereotype.Component; + +import java.util.Properties; + +/** + * @Description : KafKa生产者 + * @Author : YangHaoYu + * @Date: 2023-11-27 19:44 + */ +@Component +@Log4j2 +public class KafkaInstance { + + @Bean(name = "kafkaProducer") + public Producer createKafKa(KafkaConfig kafkaConfig) { + //创建 KafKa 生产者的配置对象 + Properties properties = new Properties(); + //给 KafKa 配置对象添加配置信息:bootstrap.servers + properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaConfig.getHosts()); + //key,value 序列化(必须):key.serializer,value.serializer + properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class); + properties.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION,1); + //创建KafKa生产者对象 + KafkaProducer kafkaProducer = new KafkaProducer<>(properties); + return kafkaProducer; + } +} diff --git a/fate-modules-service/src/main/java/com/shiyi/analysis/kafka/service/KafkaService.java b/fate-modules-service/src/main/java/com/shiyi/analysis/kafka/service/KafkaService.java new file mode 100644 index 0000000..2decbab --- /dev/null +++ b/fate-modules-service/src/main/java/com/shiyi/analysis/kafka/service/KafkaService.java @@ -0,0 +1,20 @@ +package com.shiyi.analysis.kafka.service; + +/** + * @Description : + * @Author : YangHaoYu + * @Date: 2023-11-27 19:56 + */ +public interface KafkaService { + public String getTopic(); + + public void sendMessageTakeKey(String topic, String key, String message); + + public void sendMessage(String topic, String message); + + public default void sendMessage(String message){ + sendMessage(getTopic(),message); + } + + public void sendMessageByKey(String key, String message); +} diff --git a/fate-modules-service/src/main/java/com/shiyi/analysis/kafka/service/impl/KafkaServiceimpl.java b/fate-modules-service/src/main/java/com/shiyi/analysis/kafka/service/impl/KafkaServiceimpl.java new file mode 100644 index 0000000..7f928ad --- /dev/null +++ b/fate-modules-service/src/main/java/com/shiyi/analysis/kafka/service/impl/KafkaServiceimpl.java @@ -0,0 +1,97 @@ +package com.shiyi.analysis.kafka.service.impl; + +import com.shiyi.analysis.kafka.config.KafkaConfig; +import com.shiyi.analysis.kafka.config.MQTTConfig; +import com.shiyi.analysis.kafka.service.KafkaService; +import lombok.extern.log4j.Log4j2; +import org.apache.kafka.clients.KafkaClient; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.eclipse.paho.client.mqttv3.*; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; + +import javax.annotation.PostConstruct; +import javax.annotation.Resource; + +/** + * @Description : + * @Author : YangHaoYu + * @Date: 2023-11-27 19:57 + */ +@Service +@Log4j2 +public class KafkaServiceimpl implements KafkaService { + + @Autowired + private KafkaConfig kafkaConfig; + + @Autowired + private KafkaClient kafkaClient; + + @Autowired + private MQTTConfig mqttClient; + + + /** + * mqtt持续拉 + */ + @PostConstruct + public void mqttPush() throws MqttException { + MqttClient client = mqttClient.mqttClient(); + client.subscribe("mqtt",1); + client.setCallback(new MqttCallback() { + /** + * 连接丢失 + * @param throwable + */ + @Override + public void connectionLost(Throwable throwable) { + } + + /** + * 接收到消息 + * @param s + * @param mqttMessage + * @throws Exception + */ + @Override + public void messageArrived(String s, MqttMessage mqttMessage) throws Exception { + /** + * KafKa持续推 + */ + + } + + @Override + public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) { + + } + }); + } + + + + @Resource(name = "kafkaProducer") + public Producer kafkaProducer; + + @Override + public String getTopic() { + return kafkaConfig.getTopic(); + } + + @Override + public void sendMessageTakeKey(String topic, String key, String message) { + kafkaProducer.send(new ProducerRecord(topic,key,message)); + } + + @Override + public void sendMessage(String topic, String message) { + kafkaProducer.send(new ProducerRecord(topic,message)); + } + + @Override + public void sendMessageByKey(String key, String message) { + kafkaProducer.send(new ProducerRecord(kafkaConfig.getTopic(), key, message)); + } +} diff --git a/fate-modules-service/src/main/java/com/shiyi/analysis/service/HbaseService.java b/fate-modules-service/src/main/java/com/shiyi/analysis/service/HbaseService.java index b9a95e0..1a2ccb3 100644 --- a/fate-modules-service/src/main/java/com/shiyi/analysis/service/HbaseService.java +++ b/fate-modules-service/src/main/java/com/shiyi/analysis/service/HbaseService.java @@ -3,7 +3,9 @@ package com.shiyi.analysis.service; import com.shiyi.analysis.domain.VehicleMessage; import java.io.IOException; +import java.lang.reflect.InvocationTargetException; import java.util.Collection; +import java.util.List; /** * @Description : 数据传输层 @@ -11,9 +13,9 @@ import java.util.Collection; * @Date: 2023-11-22 20:08 */ public interface HbaseService { - Collection scanRange(String tablename, String start, String end); + Collection scanRange(String tablename, String start, String end) throws IOException, InvocationTargetException, IllegalAccessException; - Collection scanRangeByStart(String tablename, String start); + Collection scanRangeByStart(String tablename, String start) throws IOException, InvocationTargetException, IllegalAccessException; void insertRecordsForObj(String tableName, String row, String columnFamilys, VehicleMessage vehicleMessage) throws IOException; @@ -21,4 +23,34 @@ public interface HbaseService { boolean isisTableExists(String tableName) throws IOException; void createTable(String tableName, String[] columnFamily) throws IOException; + + /** + * 批量插入数据 + * @param tableName 表名 + * @param row 行名 + * @param columnFamilys 列组名 + * @param columns 列名 + * @param values 值 + * @throws IOException + */ + public void insertRecords(String tableName, String row, String columnFamilys, String[] columns, String[] values)throws IOException; + + + public void insertOneRecord(String tableName, String row, String columnFamilys, String colums, String value)throws IOException; + + public void deleteRow(String tableName, String rowKey) throws IOException; + + public void deleteColumnFamily(String tableName, String rowKey, String columnFamily) throws IOException; + + public void deleteColumn(String tableName, String rowKey, String columnFamily, String column) throws IOException; + + public String selectRow(String tableName, String rowKey) throws IOException; + + public String selectValue(String tableName, String rowKey, String clumnFamily, String column) throws IOException; + + public List scanReportDataByRowKeyword(String tableName, String rowKeyword) throws IOException; + + public void deleteTable(String tableName) throws IOException; + + public void selectAll(String tableName) throws IOException; } diff --git a/fate-modules-service/src/main/java/com/shiyi/analysis/service/impl/HbaseServiceimpl.java b/fate-modules-service/src/main/java/com/shiyi/analysis/service/impl/HbaseServiceimpl.java index e942fd4..a7144d1 100644 --- a/fate-modules-service/src/main/java/com/shiyi/analysis/service/impl/HbaseServiceimpl.java +++ b/fate-modules-service/src/main/java/com/shiyi/analysis/service/impl/HbaseServiceimpl.java @@ -3,24 +3,25 @@ package com.shiyi.analysis.service.impl; import com.shiyi.analysis.BeanUtils; import com.shiyi.analysis.domain.VehicleMessage; import com.shiyi.analysis.service.HbaseService; -import lombok.SneakyThrows; import lombok.extern.log4j.Log4j2; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.*; +import org.apache.hadoop.hbase.filter.CompareFilter; +import org.apache.hadoop.hbase.filter.RowFilter; +import org.apache.hadoop.hbase.filter.SubstringComparator; import org.apache.hadoop.hbase.util.Bytes; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; +import javax.swing.*; import java.io.IOException; import java.lang.reflect.Field; +import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.List; +import java.util.*; import java.util.stream.Collectors; /** @@ -32,15 +33,22 @@ import java.util.stream.Collectors; @Service public class HbaseServiceimpl implements HbaseService { + @Autowired private Admin admin; @Autowired private Connection connection; - @SneakyThrows + /** + * 范围查询 + * @param tablename + * @param start + * @param end + * @return + */ @Override - public Collection scanRange(String tablename, String start, String end) { + public Collection scanRange(String tablename, String start, String end) throws IOException, InvocationTargetException, IllegalAccessException { Table table =connection.getTable(TableName.valueOf(tablename)); Scan scan = new Scan(); //设置起始和结束的rowKey,范围值扫描包前不包后,只能扫描到005不能扫描到006,没有设置范围将会扫描全局 @@ -68,9 +76,14 @@ public class HbaseServiceimpl implements HbaseService { return messages; } - @SneakyThrows + /** + * 根据起始范围查询 + * @param tablename + * @param start + * @return + */ @Override - public Collection scanRangeByStart(String tablename, String start) { + public Collection scanRangeByStart(String tablename, String start) throws IOException, InvocationTargetException, IllegalAccessException { Table table = connection.getTable(TableName.valueOf(tablename)); Scan scan = new Scan(); //设置起始和结束的rowkey,范围值扫描包前不包后:只能扫到0005不能扫到0006.没有设置范围将会全局扫描 @@ -98,6 +111,14 @@ public class HbaseServiceimpl implements HbaseService { return messages; } + /** + *插入记录(单行单列组-多列多值) + * @param tableName + * @param row + * @param columnFamilys + * @param vehicleMessage + * @throws IOException + */ @Override public void insertRecordsForObj(String tableName, String row, String columnFamilys, VehicleMessage vehicleMessage) throws IOException { TableName name = TableName.valueOf(String.valueOf(tableName)); @@ -116,12 +137,24 @@ public class HbaseServiceimpl implements HbaseService { } + /** + * 判断表名是否存在 + * @param tableName + * @return + * @throws IOException + */ @Override public boolean isisTableExists(String tableName) throws IOException { Admin admin = connection.getAdmin(); return admin.tableExists(TableName.valueOf(tableName)); } + /** + * 创建表 + * @param tableName + * @param columnFamily + * @throws IOException + */ @Override public void createTable(String tableName, String[] columnFamily) throws IOException { TableName name = TableName.valueOf(tableName); @@ -139,4 +172,141 @@ public class HbaseServiceimpl implements HbaseService { admin.createTable(desc); } } + + /** + * 批量插入数据 + * + * @param tableName 表名 + * @param row 行名 + * @param columnFamilys 列组名 + * @param columns 列名 + * @param values 值 + * @throws IOException + */ + @Override + public void insertRecords(String tableName, String row, String columnFamilys, String[] columns, String[] values) throws IOException { + TableName name = TableName.valueOf(tableName); + Table table = connection.getTable(name); + Put put = new Put(Bytes.toBytes(row)); + for (int i = 0; i < columns.length; i++) { + put.addColumn(Bytes.toBytes(columnFamilys), Bytes.toBytes(columns[i]), Bytes.toBytes(values[i])); + table.put(put); + } + } + + @Override + public void insertOneRecord(String tableName, String row, String columnFamilys, String colums, String value) throws IOException { + TableName name = TableName.valueOf(tableName); + Table table = connection.getTable(name); + Put put = new Put(Bytes.toBytes(row)); + put.addColumn(Bytes.toBytes(columnFamilys), Bytes.toBytes(colums), Bytes.toBytes(value)); + table.put(put); + } + + @Override + public void deleteRow(String tableName, String rowKey) throws IOException { + TableName name = TableName.valueOf(tableName); + Table table = connection.getTable(name); + Delete delete = new Delete(rowKey.getBytes()); + table.delete(delete); + } + + @Override + public void deleteColumnFamily(String tableName, String rowKey, String columnFamily) throws IOException { + TableName name = TableName.valueOf(tableName); + Table table = connection.getTable(name); + Delete delete = new Delete(rowKey.getBytes()).addFamily(Bytes.toBytes(columnFamily)); + table.delete(delete); + } + + @Override + public void deleteColumn(String tableName, String rowKey, String columnFamily, String column) throws IOException { + TableName name = TableName.valueOf(tableName); + Table table = connection.getTable(name); + Delete delete = new Delete(rowKey.getBytes()).addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes(column)); + table.delete(delete); + } + + @Override + public String selectRow(String tableName, String rowKey) throws IOException { + String record = ""; + TableName name = TableName.valueOf(tableName); + Table table = connection.getTable(name); + Get get = new Get(rowKey.getBytes()); + Result rs = table.get(get); + NavigableMap>> map = rs.getMap(); + for (Cell cell : rs.rawCells()) { + StringBuffer stringBuffer = new StringBuffer() + .append(Bytes.toString(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength())).append("\t") + .append(Bytes.toString(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength())).append("\t") + .append(Bytes.toString(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength())).append("\t") + .append(Bytes.toString(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength())).append("\n"); + String string = stringBuffer.toString(); + record += string; + } + return record; + } + + @Override + public String selectValue(String tableName, String rowKey, String clumnFamily, String column) throws IOException { + TableName name = TableName.valueOf(tableName); + Table table = connection.getTable(name); + Get get = new Get(rowKey.getBytes()); + get.addColumn(Bytes.toBytes(clumnFamily), Bytes.toBytes(column)); + Result result = table.get(get); + return Bytes.toString(result.value()); + } + + @Override + public List scanReportDataByRowKeyword(String tableName, String rowKeyword) throws IOException { + ArrayList list = new ArrayList<>(); + Table table = connection.getTable(TableName.valueOf(tableName)); + Scan scan = new Scan(); + + //添加行键过滤器,根据关键字匹配 + RowFilter rowFilter = new RowFilter(CompareFilter.CompareOp.EQUAL, new SubstringComparator(rowKeyword)); + scan.setFilter(rowFilter); + + ResultScanner scanner = table.getScanner(scan); + try { + for (Result result : scanner) { + //TODO 此处根据业务来自定义实现 + list.add(null); + } + }finally { + if (scanner != null){ + scanner.close(); + } + } + return list; + } + + @Override + public void deleteTable(String tableName) throws IOException { + TableName name = TableName.valueOf(tableName); + if (admin.tableExists(name)){ + admin.disableTable(name); + admin.deleteTable(name); + } + } + + @Override + public void selectAll(String tableName) throws IOException { + Table table = connection.getTable(TableName.valueOf(tableName)); + Scan scan = new Scan(); + ResultScanner scanner = table.getScanner(scan); + Class aClass = VehicleMessage.class; + for (Result result : scanner) { + List cells = result.listCells(); + for (Cell cell : cells) { + String rowKey =Bytes.toString(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength()); + String familyName = Bytes.toString(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength()); + String columnName = Bytes.toString(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength()); + String value = Bytes.toString(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength()); + log.info("rowKey:" +rowKey, "familyName:" +familyName, "columnName:" +columnName, "value:" +value); + } + } + } + + } diff --git a/fate-modules-service/src/main/resources/banner.txt b/fate-modules-service/src/main/resources/banner.txt deleted file mode 100644 index 0dd5eee..0000000 --- a/fate-modules-service/src/main/resources/banner.txt +++ /dev/null @@ -1,2 +0,0 @@ -Spring Boot Version: ${spring-boot.version} -Spring Application Name: ${spring.application.name} diff --git a/fate-modules-service/src/main/resources/bootstrap.yml b/fate-modules-service/src/main/resources/bootstrap.yml index a33cb7c..f4b8003 100644 --- a/fate-modules-service/src/main/resources/bootstrap.yml +++ b/fate-modules-service/src/main/resources/bootstrap.yml @@ -23,9 +23,7 @@ spring: # 共享配置 shared-configs: - application-${spring.profiles.active}.${spring.cloud.nacos.config.file-extension} -logging: - level: - com.fate.system.mapper: DEBUG + #mybatis: diff --git a/fate-modules-service/src/main/resources/logback.xml b/fate-modules-service/src/main/resources/logback.xml deleted file mode 100644 index 5c0e12e..0000000 --- a/fate-modules-service/src/main/resources/logback.xml +++ /dev/null @@ -1,74 +0,0 @@ - - - - - - - - - - - ${log.pattern} - - - - - - ${log.path}/info.log - - - - ${log.path}/info.%d{yyyy-MM-dd}.log - - 60 - - - ${log.pattern} - - - - INFO - - ACCEPT - - DENY - - - - - ${log.path}/error.log - - - - ${log.path}/error.%d{yyyy-MM-dd}.log - - 60 - - - ${log.pattern} - - - - ERROR - - ACCEPT - - DENY - - - - - - - - - - - - - - - - - -