diff --git a/fate-modules-common/pom.xml b/fate-modules-common/pom.xml
index 89d4494..42c71f8 100644
--- a/fate-modules-common/pom.xml
+++ b/fate-modules-common/pom.xml
@@ -61,18 +61,6 @@
fate-common-datascope
-
-
- com.fate
- fate-common-log
-
-
-
-
- com.fate
- fate-common-swagger
-
-
diff --git a/fate-modules-common/src/main/java/com/shiyi/analysis/constants/KafkaConstant.java b/fate-modules-common/src/main/java/com/shiyi/analysis/constants/KafkaConstant.java
index a672793..652befd 100644
--- a/fate-modules-common/src/main/java/com/shiyi/analysis/constants/KafkaConstant.java
+++ b/fate-modules-common/src/main/java/com/shiyi/analysis/constants/KafkaConstant.java
@@ -9,5 +9,5 @@ public class KafkaConstant {
public static final String ANALYSIS_MESSAGE = "analysis_message";
- public static final String KAFKA_SERVERS = "123.249.90.97:9092";
+ public static final String KAFKA_SERVERS = "182.254.222.21:9092";
}
diff --git a/fate-modules-common/src/main/java/com/shiyi/analysis/constants/VehicleConstant.java b/fate-modules-common/src/main/java/com/shiyi/analysis/constants/VehicleConstant.java
index 0b8df14..1cf2c3b 100644
--- a/fate-modules-common/src/main/java/com/shiyi/analysis/constants/VehicleConstant.java
+++ b/fate-modules-common/src/main/java/com/shiyi/analysis/constants/VehicleConstant.java
@@ -1,7 +1,7 @@
package com.shiyi.analysis.constants;
/**
- * @Description : 报文
+ * @Description : 报文解析类
* @Author : YangHaoYu
* @Date: 2023-11-22 19:35
*/
diff --git a/fate-modules-common/src/main/java/com/shiyi/analysis/domain/VehicleMessage.java b/fate-modules-common/src/main/java/com/shiyi/analysis/domain/VehicleMessage.java
index 6117639..0722cf3 100644
--- a/fate-modules-common/src/main/java/com/shiyi/analysis/domain/VehicleMessage.java
+++ b/fate-modules-common/src/main/java/com/shiyi/analysis/domain/VehicleMessage.java
@@ -1,10 +1,16 @@
package com.shiyi.analysis.domain;
+import com.baomidou.mybatisplus.annotation.TableField;
+import com.baomidou.mybatisplus.annotation.TableId;
+import com.baomidou.mybatisplus.annotation.TableName;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.experimental.SuperBuilder;
+import java.math.BigDecimal;
+import java.util.Date;
+
/**
* @Description : 车辆实时数据
* @Author : YangHaoYu
@@ -14,724 +20,289 @@ import lombok.experimental.SuperBuilder;
@SuperBuilder
@AllArgsConstructor
@NoArgsConstructor
+@TableName("vehicle_message")
public class VehicleMessage {
/**
- * 消息标识
+ * VIN
*/
- private String identifier;
+ @TableId(value = "vin")
+ private String vin;
/**
- * VIN码
+ * 时间戳
*/
- private String vin;
+ @TableField("create_time")
+ private Date createTime;
/**
* 经度
*/
+ @TableField("longitude")
private String longitude;
/**
- * 纬度
+ * 维度
*/
+ @TableField("latitude")
private String latitude;
/**
- * 车速
+ * 速度
*/
+ @TableField("speed")
private String speed;
/**
* 总里程
*/
- private String totalDistance;
+ @TableField("mileage")
+ private BigDecimal mileage;
/**
* 总电压
*/
- private String totalVoltage;
+ @TableField("voltage")
+ private String voltage;
/**
* 总电流
*/
- private String jointCurrent;
+ @TableField("current")
+ private String current;
/**
* 绝缘电阻
*/
- private String insulationResistance;
+ @TableField("resistance")
+ private String resistance;
/**
- * 档位
+ * 挡位
*/
- private String gears;
+ @TableField("gear")
+ private String gear;
/**
* 加速踏板行程值
*/
- private String acceleratorPedal;
+ @TableField("accelerationPedal")
+ private String accelerationPedal;
/**
* 制动踏板行程值
*/
+ @TableField("brakePedal")
private String brakePedal;
/**
* 燃料消耗率
*/
- private String fuelRate;
+ @TableField("fuelConsumptionRate")
+ private String fuelConsumptionRate;
/**
* 电机控制器温度
*/
+ @TableField("motorControllerTemperature")
private String motorControllerTemperature;
/**
* 电机转速
*/
+ @TableField("motorSpeed")
private String motorSpeed;
/**
* 电机转矩
*/
+ @TableField("motorTorque")
private String motorTorque;
/**
* 电机温度
*/
+ @TableField("motorTemperature")
private String motorTemperature;
/**
* 电机电压
*/
+ @TableField("motorVoltage")
private String motorVoltage;
/**
* 电机电流
*/
+ @TableField("motorCurrent")
private String motorCurrent;
/**
* 动力电池剩余电量SOC
*/
- private String dumpEnergy;
+ @TableField("remainingBattery")
+ private BigDecimal remainingBattery;
/**
* 当前状态允许的最大反馈功率
*/
+ @TableField("maximumFeedbackPower")
private String maximumFeedbackPower;
/**
* 当前状态允许最大放电功率
*/
+ @TableField("maximumDischargePower")
private String maximumDischargePower;
/**
* BMS自检计数器
*/
- private String selfCheckingCounter;
+ @TableField("selfCheckCounter")
+ private String selfCheckCounter;
/**
* 动力电池充放电电流
*/
- private String chargingAndDischargingCurrent;
+ @TableField("totalBatteryCurrent")
+ private String totalBatteryCurrent;
/**
* 动力电池负载端总电压V3
*/
- private String totalVoltageAtLoadEnd;
+ @TableField("totalBatteryVoltage")
+ private String totalBatteryVoltage;
/**
* 单次最大电压
*/
- private String maximumVoltage;
+ @TableField("singleBatteryMaxVoltage")
+ private String singleBatteryMaxVoltage;
/**
* 单体电池最低电压
*/
- private String lowestVoltageBattery;
+ @TableField("singleBatteryMinVoltage")
+ private String singleBatteryMinVoltage;
/**
* 单体电池最高温度
*/
- private String maximumTemperatureBattery;
+ @TableField("singleBatteryMaxTemperature")
+ private String singleBatteryMaxTemperature;
/**
* 单体电池最低温度
*/
- private String lowestTemperatureBattery;
+ @TableField("singleBatteryMinTemperature")
+ private String singleBatteryMinTemperature;
/**
* 动力电池可用容量
*/
+ @TableField("availableBatteryCapacity")
private String availableBatteryCapacity;
/**
* 车辆状态
*/
- private Integer vehicleState;
+ @TableField("vehicleStatus")
+ private int vehicleStatus;
/**
* 充电状态
*/
- private Integer chargingState;
+ @TableField("chargingStatus")
+ private int chargingStatus;
/**
* 运行状态
*/
- private Integer runningStatus;
+ @TableField("operatingStatus")
+ private int operatingStatus;
/**
* SOC
*/
- private Integer socStatus;
+ @TableField("socStatus")
+ private int socStatus;
/**
* 可充电储能装置工作状态
*/
- private Integer energyStorageDeviceState;
+ @TableField("chargingEnergyStorageStatus")
+ private int chargingEnergyStorageStatus;
/**
* 驱动电机状态
*/
- private Integer driveMotorCondition;
+ @TableField("driveMotorStatus")
+ private int driveMotorStatus;
/**
* 定位是否有效
*/
- private Integer positioningState;
+ @TableField("positionStatus")
+ private int positionStatus;
/**
- * EAS 电子防窃系统状态
+ * EAS(汽车防盗系统)状态
*/
- private Integer easStatus;
+ @TableField("easStatus")
+ private int easStatus;
/**
- * PTC 传动系统状态
+ * PTC(电动加热器)状态
*/
- private Integer ptcStatus;
+ @TableField("ptcStatus")
+ private int ptcStatus;
/**
- * EPS 蓄电池状态
+ * EPS(电动助力系统)状态
*/
- private Integer epsStatus;
+ @TableField("epsStatus")
+ private int epsStatus;
/**
- * ABS 防滑刹车系统状态
+ * ABS(防抱死)状态
*/
- private Integer absStatus;
+ @TableField("absStatus")
+ private int absStatus;
/**
- * MCU 微处理器状态
+ * MCU(电机/逆变器)状态
*/
- private Integer mcuStatus;
+ @TableField("mcuStatus")
+ private int mcuStatus;
/**
* 动力电池加热状态
*/
- private Integer batteryHeatingCondition;
+ @TableField("heatingStatus")
+ private int heatingStatus;
/**
* 动力电池当前状态
*/
- private Integer currentBatteryStatus;
+ @TableField("batteryStatus")
+ private int batteryStatus;
/**
* 动力电池保温状态
*/
- private Integer batteryInsulationStatus;
-
+ @TableField("batteryInsulationStatus")
+ private int batteryInsulationStatus;
/**
* DCDC 电力交换系统状态
*/
+ @TableField("dcdcStatus")
private Integer dcdcStatus;
/**
* CHG 充电机状态
*/
+ @TableField("chgStatus")
private Integer chgStatus;
- /**
- *时间
- */
- private Long time;
- public Long getTime() {
- return time;
- }
- public void setTime(Object time) {
- if(time != null){
- this.time = Long.valueOf(time.toString());
- }
- }
-
- public String getIdentifier() {
- return identifier;
- }
-
- public void setIdentifier(Object identifier) {
- if(identifier != null) {
- this.identifier = (String) identifier;
- }
- }
-
- public String getVin() {
- return vin;
- }
-
- public void setVin(Object vin) {
- if(vin != null) {
- this.vin = (String) vin;
- }
- }
-
- public String getLongitude() {
- return longitude;
- }
-
- public void setLongitude(Object longitude) {
- if(longitude != null) {
- this.longitude = (String) longitude;
- }
- }
-
- public String getLatitude() {
- return latitude;
- }
-
- public void setLatitude(Object latitude) {
- if(latitude != null) {
- this.latitude = (String) latitude;
- }
- }
-
- public String getSpeed() {
- return speed;
- }
-
- public void setSpeed(Object speed) {
- if(speed != null) {
- this.speed = (String) speed;
- }
- }
-
- public String getTotalDistance() {
- return totalDistance;
- }
-
- public void setTotalDistance(Object totalDistance) {
- if(totalDistance != null) {
- this.totalDistance = (String) totalDistance;
- }
- }
-
- public String getTotalVoltage() {
- return totalVoltage;
- }
-
- public void setTotalVoltage(Object totalVoltage) {
- if(totalVoltage != null) {
- this.totalVoltage = (String) totalVoltage;
- }
- }
-
- public String getJointCurrent() {
- return jointCurrent;
- }
-
- public void setJointCurrent(Object jointCurrent) {
- if(jointCurrent != null) {
- this.jointCurrent = (String) jointCurrent;
- }
- }
-
- public String getInsulationResistance() {
- return insulationResistance;
- }
-
- public void setInsulationResistance(Object insulationResistance) {
- if(insulationResistance != null) {
- this.insulationResistance = (String) insulationResistance;
- }
- }
-
- public String getGears() {
- return gears;
- }
-
- public void setGears(Object gears) {
- if(gears != null) {
- this.gears = (String) gears;
- }
- }
-
- public String getAcceleratorPedal() {
- return acceleratorPedal;
- }
-
- public void setAcceleratorPedal(Object acceleratorPedal) {
- if(acceleratorPedal != null) {
- this.acceleratorPedal = (String) acceleratorPedal;
- }
- }
-
- public String getBrakePedal() {
- return brakePedal;
- }
-
- public void setBrakePedal(Object brakePedal) {
- if(brakePedal != null) {
- this.brakePedal = (String) brakePedal;
- }
- }
-
- public String getFuelRate() {
- return fuelRate;
- }
-
- public void setFuelRate(Object fuelRate) {
- if(fuelRate != null) {
- this.fuelRate = (String) fuelRate;
- }
- }
-
- public String getMotorControllerTemperature() {
- return motorControllerTemperature;
- }
-
- public void setMotorControllerTemperature(Object motorControllerTemperature) {
- if(motorControllerTemperature != null) {
- this.motorControllerTemperature = (String) motorControllerTemperature;
- }
- }
-
- public String getMotorSpeed() {
- return motorSpeed;
- }
-
- public void setMotorSpeed(Object motorSpeed) {
- if(motorSpeed != null) {
- this.motorSpeed = (String) motorSpeed;
- }
- }
-
- public String getMotorTorque() {
- return motorTorque;
- }
-
- public void setMotorTorque(Object motorTorque) {
- if(motorTorque != null) {
- this.motorTorque = (String) motorTorque;
- }
- }
-
- public String getMotorTemperature() {
- return motorTemperature;
- }
-
- public void setMotorTemperature(Object motorTemperature) {
- if(motorTemperature != null) {
- this.motorTemperature = (String) motorTemperature;
- }
- }
-
- public String getMotorVoltage() {
- return motorVoltage;
- }
-
- public void setMotorVoltage(Object motorVoltage) {
- if(motorVoltage != null) {
- this.motorVoltage = (String) motorVoltage;
- }
- }
-
- public String getMotorCurrent() {
- return motorCurrent;
- }
-
- public void setMotorCurrent(Object motorCurrent) {
- if(motorCurrent != null) {
- this.motorCurrent = (String) motorCurrent;
- }
- }
-
- public String getDumpEnergy() {
- return dumpEnergy;
- }
-
- public void setDumpEnergy(Object dumpEnergy) {
- if(dumpEnergy != null) {
- this.dumpEnergy = (String) dumpEnergy;
- }
- }
-
- public String getMaximumFeedbackPower() {
- return maximumFeedbackPower;
- }
-
- public void setMaximumFeedbackPower(Object maximumFeedbackPower) {
- if(maximumFeedbackPower != null) {
- this.maximumFeedbackPower = (String) maximumFeedbackPower;
- }
- }
-
- public String getMaximumDischargePower() {
- return maximumDischargePower;
- }
-
- public void setMaximumDischargePower(Object maximumDischargePower) {
- if(maximumDischargePower != null) {
- this.maximumDischargePower = (String) maximumDischargePower;
- }
- }
-
- public String getSelfCheckingCounter() {
- return selfCheckingCounter;
- }
-
- public void setSelfCheckingCounter(Object selfCheckingCounter) {
- if(selfCheckingCounter != null) {
- this.selfCheckingCounter = (String) selfCheckingCounter;
- }
- }
-
- public String getChargingAndDischargingCurrent() {
- return chargingAndDischargingCurrent;
- }
-
- public void setChargingAndDischargingCurrent(Object chargingAndDischargingCurrent) {
- if(chargingAndDischargingCurrent != null) {
- this.chargingAndDischargingCurrent = (String) chargingAndDischargingCurrent;
- }
- }
-
- public String getTotalVoltageAtLoadEnd() {
- return totalVoltageAtLoadEnd;
- }
-
- public void setTotalVoltageAtLoadEnd(Object totalVoltageAtLoadEnd) {
- if(totalVoltageAtLoadEnd != null) {
- this.totalVoltageAtLoadEnd = (String) totalVoltageAtLoadEnd;
- }
- }
-
- public String getMaximumVoltage() {
- return maximumVoltage;
- }
-
- public void setMaximumVoltage(Object maximumVoltage) {
- if(maximumVoltage != null) {
- this.maximumVoltage = (String) maximumVoltage;
- }
- }
-
- public String getLowestVoltageBattery() {
- return lowestVoltageBattery;
- }
-
- public void setLowestVoltageBattery(Object lowestVoltageBattery) {
- if(lowestVoltageBattery != null) {
- this.lowestVoltageBattery = (String) lowestVoltageBattery;
- }
- }
-
- public String getMaximumTemperatureBattery() {
- return maximumTemperatureBattery;
- }
-
- public void setMaximumTemperatureBattery(Object maximumTemperatureBattery) {
- if(maximumTemperatureBattery != null) {
- this.maximumTemperatureBattery = (String) maximumTemperatureBattery;
- }
- }
-
- public String getLowestTemperatureBattery() {
- return lowestTemperatureBattery;
- }
-
- public void setLowestTemperatureBattery(Object lowestTemperatureBattery) {
- if(lowestTemperatureBattery != null) {
- this.lowestTemperatureBattery = (String) lowestTemperatureBattery;
- }
- }
-
- public String getAvailableBatteryCapacity() {
- return availableBatteryCapacity;
- }
-
- public void setAvailableBatteryCapacity(Object availableBatteryCapacity) {
- if(availableBatteryCapacity != null) {
- this.availableBatteryCapacity = (String) availableBatteryCapacity;
- }
- }
-
- public Integer getVehicleState() {
- return vehicleState;
- }
-
- public void setVehicleState(Object vehicleState) {
- if(vehicleState != null) {
- this.vehicleState = Integer.valueOf(vehicleState.toString());
- }
- }
-
- public Integer getChargingState() {
- return chargingState;
- }
-
- public void setChargingState(Object chargingState) {
- if(chargingState != null) {
- this.chargingState = Integer.valueOf(chargingState.toString());
- }
- }
-
- public Integer getRunningStatus() {
- return runningStatus;
- }
-
- public void setRunningStatus(Object runningStatus) {
- if(runningStatus != null){
- this.runningStatus = Integer.valueOf(runningStatus.toString());
- }
- }
-
- public Integer getSocStatus() {
- return socStatus;
- }
-
- public void setSocStatus(Object socStatus) {
- if(socStatus != null) {
- this.socStatus = Integer.valueOf(socStatus.toString());
- }
- }
-
- public Integer getEnergyStorageDeviceState() {
- return energyStorageDeviceState;
- }
-
- public void setEnergyStorageDeviceState(Object energyStorageDeviceState) {
- if(energyStorageDeviceState != null){
- this.energyStorageDeviceState = Integer.valueOf(energyStorageDeviceState.toString());
- }
- }
-
- public Integer getDriveMotorCondition() {
- return driveMotorCondition;
- }
-
- public void setDriveMotorCondition(Object driveMotorCondition) {
- if(driveMotorCondition != null) {
- this.driveMotorCondition = Integer.valueOf(driveMotorCondition.toString());
- }
- }
-
- public Integer getPositioningState() {
- return positioningState;
- }
-
- public void setPositioningState(Object positioningState) {
- if(positioningState != null) {
- this.positioningState = Integer.valueOf(positioningState.toString());
- }
- }
-
- public Integer getEasStatus() {
- return easStatus;
- }
-
- public void setEasStatus(Object easStatus) {
- if(easStatus != null) {
- this.easStatus = Integer.valueOf(easStatus.toString());
- }
- }
-
- public Integer getPtcStatus() {
- return ptcStatus;
- }
-
- public void setPtcStatus(Object ptcStatus) {
- if(ptcStatus != null) {
- this.ptcStatus = Integer.valueOf(ptcStatus.toString());
- }
- }
-
- public Integer getEpsStatus() {
- return epsStatus;
- }
-
- public void setEpsStatus(Object epsStatus) {
- if(epsStatus != null) {
- this.epsStatus = Integer.valueOf(epsStatus.toString());
- }
- }
-
- public Integer getAbsStatus() {
- return absStatus;
- }
-
- public void setAbsStatus(Object absStatus) {
- if(absStatus != null) {
- this.absStatus = Integer.valueOf(absStatus.toString());
- }
- }
-
- public Integer getMcuStatus() {
- return mcuStatus;
- }
-
- public void setMcuStatus(Object mcuStatus) {
- if(mcuStatus != null) {
- this.mcuStatus = Integer.valueOf(mcuStatus.toString());
- }
- }
-
- public Integer getBatteryHeatingCondition() {
- return batteryHeatingCondition;
- }
-
- public void setBatteryHeatingCondition(Object batteryHeatingCondition) {
- if(batteryHeatingCondition != null) {
- this.batteryHeatingCondition = Integer.valueOf(batteryHeatingCondition.toString());
- }
- }
-
- public Integer getCurrentBatteryStatus() {
- return currentBatteryStatus;
- }
-
- public void setCurrentBatteryStatus(Object currentBatteryStatus) {
- if(currentBatteryStatus != null){
- this.currentBatteryStatus = Integer.valueOf(currentBatteryStatus.toString());
- }
- }
-
- public Integer getBatteryInsulationStatus() {
- return batteryInsulationStatus;
- }
-
- public void setBatteryInsulationStatus(Object batteryInsulationStatus) {
- if(batteryInsulationStatus != null){
- this.batteryInsulationStatus = Integer.valueOf(batteryInsulationStatus.toString());
- }
- }
-
- public Integer getDcdcStatus() {
- return dcdcStatus;
- }
-
- public void setDcdcStatus(Object dcdcStatus) {
- if(dcdcStatus != null) {
- this.dcdcStatus = Integer.valueOf(dcdcStatus.toString());
- }
- }
-
- public Integer getChgStatus() {
- return chgStatus;
- }
-
- public void setChgStatus(Object chgStatus) {
- if(chgStatus != null) {
- this.chgStatus = Integer.valueOf(chgStatus.toString());
- }
- }
}
diff --git a/fate-modules-service/pom.xml b/fate-modules-service/pom.xml
index 7a61f2b..73cfdf6 100644
--- a/fate-modules-service/pom.xml
+++ b/fate-modules-service/pom.xml
@@ -36,6 +36,50 @@
hbase-client
2.4.7
+
+ org.hsqldb
+ hsqldb
+ 2.5.0
+
+
+ org.apache.hadoop
+ hadoop-common
+ 3.3.1
+
+
+ org.apache.hadoop
+ hadoop-auth
+ 3.3.1
+
+
+ org.apache.hadoop
+ hadoop-hdfs
+ 3.3.1
+
+
+ org.apache.hbase
+ hbase-client
+ 1.4.7
+
+
+ org.apache.hbase
+ hbase-common
+ 1.4.7
+
+
+ org.apache.hbase
+ hbase-server
+ 1.4.7
+
+
+
+
+
+
+ mysql
+ mysql-connector-java
+ 8.0.28
+
@@ -81,6 +125,11 @@
+
+
+ org.springframework.integration
+ spring-integration-mqtt
+
@@ -101,6 +150,14 @@
+
+ org.apache.maven.plugins
+ maven-compiler-plugin
+
+ 15
+ 15
+
+
diff --git a/fate-modules-service/src/main/java/com/shiyi/analysis/AnalysisApplication.java b/fate-modules-service/src/main/java/com/shiyi/analysis/AnalysisApplication.java
index ba0fc27..0e531e0 100644
--- a/fate-modules-service/src/main/java/com/shiyi/analysis/AnalysisApplication.java
+++ b/fate-modules-service/src/main/java/com/shiyi/analysis/AnalysisApplication.java
@@ -2,7 +2,6 @@ package com.shiyi.analysis;
import com.fate.common.security.annotation.EnableCustomConfig;
import com.fate.common.security.annotation.EnableMyFeignClients;
-import com.fate.common.swagger.annotation.EnableCustomSwagger2;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@@ -12,7 +11,6 @@ import org.springframework.boot.autoconfigure.SpringBootApplication;
* @Date: 2023-11-20 22:16
*/
@EnableCustomConfig
-@EnableCustomSwagger2
@EnableMyFeignClients
@SpringBootApplication
public class AnalysisApplication {
diff --git a/fate-modules-service/src/main/java/com/shiyi/analysis/config/KafkaConsumer.java b/fate-modules-service/src/main/java/com/shiyi/analysis/config/KafkaConsumer.java
index 788e7f4..6e40991 100644
--- a/fate-modules-service/src/main/java/com/shiyi/analysis/config/KafkaConsumer.java
+++ b/fate-modules-service/src/main/java/com/shiyi/analysis/config/KafkaConsumer.java
@@ -61,7 +61,7 @@ public class KafkaConsumer {
public void OnMmessage(ConsumerRecord,?> consumerRecord){
//提交到线程池里
alanAnalysisThreadPool.submit(() ->{
-
+ messageHandler(consumerRecord);
});
}
@@ -88,10 +88,10 @@ public class KafkaConsumer {
// 车辆围栏处理
rabbitTemplate.convertSendAndReceive(RabbitConstants.FENCE_HANDLER_QUEUE, JSON.toJSONString(vehicleMessage));
try {
- String tableName = HBaseConstant.HBASE_TABLE_PREFIX + tableNamemat.format(new Date(vehicleMessage.getTime()));
+ String tableName = HBaseConstant.HBASE_TABLE_PREFIX + tableNamemat.format(vehicleMessage.getCreateTime());
log.info("信息入库;database:{},vin:{},time:{}",tableName,vehicleMessage.getVin(),System.currentTimeMillis());
// 将信息添加到hbase
- hbaseService.insertRecordsForObj(tableName,vehicleMessage.getVin()+ ":" +vehicleMessage.getTime(),"info",vehicleMessage);
+ hbaseService.insertRecordsForObj(tableName,vehicleMessage.getVin()+ ":" +vehicleMessage.getCreateTime(),"info",vehicleMessage);
}catch (Exception e){
log.info("添加hbase出错:{}",e.getMessage());
}
@@ -99,7 +99,4 @@ public class KafkaConsumer {
}
-
-
-
}
diff --git a/fate-modules-service/src/main/java/com/shiyi/analysis/controller/AnalysisController.java b/fate-modules-service/src/main/java/com/shiyi/analysis/controller/AnalysisController.java
index 32d0f5d..606718f 100644
--- a/fate-modules-service/src/main/java/com/shiyi/analysis/controller/AnalysisController.java
+++ b/fate-modules-service/src/main/java/com/shiyi/analysis/controller/AnalysisController.java
@@ -38,7 +38,7 @@ public class AnalysisController {
@SneakyThrows
@PostMapping("trackHistory")
public Result> trackHistory(@RequestBody DrivingRecord drivingRecord){
- log.info("历史查询 ——开始时间:【{}】,结束时间:【{}】",new Date(Long.parseLong(drivingRecord.getStartKey())).toLocaleString(),new Date(drivingRecord.getEndKey()).toLocaleString());
+ log.info("查询 ——开始时间:【{}】,结束时间:【{}】",new Date(Long.parseLong(drivingRecord.getStartKey())).toLocaleString(),new Date(drivingRecord.getEndKey()).toLocaleString());
//根据输入输出时间格式化日期
String startDay = targetDay.format(new Date(Long.parseLong(drivingRecord.getStartKey())));
String endDay = targetDay.format(new Date(Long.parseLong(drivingRecord.getEndKey())));
diff --git a/fate-modules-service/src/main/java/com/shiyi/analysis/handler/MessageHandler.java b/fate-modules-service/src/main/java/com/shiyi/analysis/handler/MessageHandler.java
index d252218..baba3b4 100644
--- a/fate-modules-service/src/main/java/com/shiyi/analysis/handler/MessageHandler.java
+++ b/fate-modules-service/src/main/java/com/shiyi/analysis/handler/MessageHandler.java
@@ -3,8 +3,11 @@ package com.shiyi.analysis.handler;
import com.shiyi.analysis.domain.VehicleMessage;
import lombok.extern.log4j.Log4j2;
+import java.math.BigDecimal;
+import java.sql.Date;
+
/**
- * @Description : 报文处理类
+ * @Description : 报文切割处理类
* @Author : YangHaoYu
* @Date: 2023-11-22 20:05
*/
@@ -12,103 +15,106 @@ import lombok.extern.log4j.Log4j2;
public class MessageHandler {
public static VehicleMessage messageTranslated(String content){
+ char start = content.charAt(0);
+ char end = content.charAt(content.length() - 1);
+ System.out.println(start);
+ System.out.println(end);
VehicleMessage vehicleMessage = new VehicleMessage();
- //消息标识
- vehicleMessage.setIdentifier(content.substring(0,12));
//vin码
- vehicleMessage.setVin(content.substring(12,29));
- //经度
- vehicleMessage.setLongitude(content.substring(29,40));
- //纬度
- vehicleMessage.setLatitude(content.substring(40,50));
- //车速
- vehicleMessage.setSpeed(content.substring(50,56));
- //总里程
- vehicleMessage.setTotalDistance(content.substring(56,67));
- //总电压
- vehicleMessage.setTotalVoltage(content.substring(67,73));
- //总电流
- vehicleMessage.setJointCurrent(content.substring(73,78));
- //绝缘电阻
- vehicleMessage.setInsulationResistance(content.substring(78,87));
- //档位
- vehicleMessage.setGears(content.substring(87,88));
- //加速踏板行程值
- vehicleMessage.setAcceleratorPedal(content.substring(88,90));
- //制动踏板行程值
- vehicleMessage.setBrakePedal(content.substring(90,92));
- //燃料消耗率
- vehicleMessage.setFuelRate(content.substring(92,98));
- //电机控制器温度
- vehicleMessage.setMotorControllerTemperature(content.substring(98,103));
- //电机转速
- vehicleMessage.setMotorSpeed(content.substring(103,108));
- //电机转矩
- vehicleMessage.setMotorTorque(content.substring(108,112));
- //电机温度
- vehicleMessage.setMotorTemperature(content.substring(112,118));
- //电机电压
- vehicleMessage.setMotorVoltage(content.substring(118,123));
- //电机电流
- vehicleMessage.setMotorCurrent(content.substring(123,132));
- //动力电池剩余电量SOC
- vehicleMessage.setDumpEnergy(content.substring(132,137));
- //当前状态允许的最大反馈功率
- vehicleMessage.setMaximumFeedbackPower(content.substring(137,143));
- //当前状态允许最大放电功率
- vehicleMessage.setMaximumDischargePower(content.substring(143,149));
- //BMS自检计数器
- vehicleMessage.setSelfCheckingCounter(content.substring(149,151));
- //动力电池充放电电流
- vehicleMessage.setChargingAndDischargingCurrent(content.substring(151,156));
- //动力电池负载端总电压V3
- vehicleMessage.setTotalVoltageAtLoadEnd(content.substring(156,162));
- //单次最大电压
- vehicleMessage.setMaximumVoltage(content.substring(162,166));
- //单体电池最低电压
- vehicleMessage.setLowestVoltageBattery(content.substring(166,170));
- //单体电池最高温度
- vehicleMessage.setMaximumTemperatureBattery(content.substring(170,176));
- //单体电池最低温度
- vehicleMessage.setLowestTemperatureBattery(content.substring(176,182));
- //动力电池可用容量
- vehicleMessage.setAvailableBatteryCapacity(content.substring(182,188));
- //车辆状态
- vehicleMessage.setVehicleState(Integer.valueOf(content.substring(188,189)));
- //充电状态
- vehicleMessage.setChargingState(Integer.valueOf(content.substring(189,190)));
- //运行状态
- vehicleMessage.setRunningStatus(Integer.valueOf(content.substring(190,191)));
- //Soc状态
- vehicleMessage.setSocStatus(Integer.valueOf(content.substring(191,192)));
- //可充电储能装置工作状态
- vehicleMessage.setEnergyStorageDeviceState(Integer.valueOf(content.substring(192,193)));
- //驱动电机状态
- vehicleMessage.setDriveMotorCondition(Integer.valueOf(content.substring(193,194)));
- //定位是否有效
- vehicleMessage.setPositioningState(Integer.valueOf(content.substring(194,195)));
- //EAS 电子防窃系统状态
- vehicleMessage.setEasStatus(Integer.valueOf(content.substring(195,196)));
- //PTC 传动系统状态
- vehicleMessage.setPtcStatus(Integer.valueOf(content.substring(196,197)));
- //EPS 蓄电池状态
- vehicleMessage.setEpsStatus(Integer.valueOf(content.substring(197,198)));
- //ABS 防滑刹车系统状态
- vehicleMessage.setAbsStatus(Integer.valueOf(content.substring(198,199)));
- //MCU 微处理器状态
- vehicleMessage.setMcuStatus(Integer.valueOf(content.substring(199,200)));
- //动力电池加热状态
- vehicleMessage.setBatteryHeatingCondition(Integer.valueOf(content.substring(200,201)));
- //动力电池当前状态
- vehicleMessage.setCurrentBatteryStatus(Integer.valueOf(content.substring(201,202)));
- //动力电池保温状态
- vehicleMessage.setBatteryInsulationStatus(Integer.valueOf(content.substring(202,203)));
- //DCDC 电力交换系统状态
- vehicleMessage.setDcdcStatus(Integer.valueOf(content.substring(203,204)));
- //CHG 充电机状态
- vehicleMessage.setChgStatus(Integer.valueOf(content.substring(204,205)));
+ vehicleMessage.setVin(content.substring(2,18));
//截取从车辆接受端获取端时间戳
- vehicleMessage.setTime(Long.valueOf(content.substring(205,218)));
+ vehicleMessage.setCreateTime(Date.valueOf(content.substring(18,31)));
+ //经度
+ vehicleMessage.setLongitude(content.substring(31,42));
+ //纬度
+ vehicleMessage.setLatitude(content.substring(42,52));
+ //车速
+ vehicleMessage.setSpeed(content.substring(52,58));
+ //总里程
+ vehicleMessage.setMileage(new BigDecimal(content.substring(58,69)));
+ //总电压
+ vehicleMessage.setVoltage(content.substring(69,75));
+ //总电流
+ vehicleMessage.setCurrent(content.substring(75,80));
+ //绝缘电阻
+ vehicleMessage.setResistance(content.substring(80,89));
+ //档位
+ vehicleMessage.setGear(content.substring(89,90));
+ //加速踏板行程值
+ vehicleMessage.setAccelerationPedal(content.substring(90,92));
+ //制动踏板行程值
+ vehicleMessage.setBrakePedal(content.substring(92,94));
+ //燃料消耗率
+ vehicleMessage.setFuelConsumptionRate(content.substring(94,99));
+ //电机控制器温度
+ vehicleMessage.setMotorControllerTemperature(content.substring(99,105));
+ //电机转速
+ vehicleMessage.setMotorSpeed(content.substring(105,110));
+ //电机转矩
+ vehicleMessage.setMotorTorque(content.substring(110,114));
+ //电机温度
+ vehicleMessage.setMotorTemperature(content.substring(114,120));
+ //电机电压
+ vehicleMessage.setMotorVoltage(content.substring(120,125));
+ //电机电流
+ vehicleMessage.setMotorCurrent(content.substring(125,133));
+ //动力电池剩余电量SOC
+ vehicleMessage.setRemainingBattery(new BigDecimal(content.substring(133,139)));
+ //当前状态允许的最大反馈功率
+ vehicleMessage.setMaximumFeedbackPower(content.substring(139,145));
+ //当前状态允许最大放电功率
+ vehicleMessage.setMaximumDischargePower(content.substring(145,151));
+ //BMS自检计数器
+ vehicleMessage.setSelfCheckCounter(content.substring(151,153));
+ //动力电池充放电电流
+ vehicleMessage.setTotalBatteryCurrent(content.substring(153,158));
+ //动力电池负载端总电压V3
+ vehicleMessage.setTotalBatteryVoltage(content.substring(158,164));
+ //单次最大电压
+ vehicleMessage.setSingleBatteryMaxVoltage(content.substring(164,168));
+ //单体电池最低电压
+ vehicleMessage.setSingleBatteryMinVoltage(content.substring(168,172));
+ //单体电池最高温度
+ vehicleMessage.setSingleBatteryMaxTemperature(content.substring(172,178));
+ //单体电池最低温度
+ vehicleMessage.setSingleBatteryMinTemperature(content.substring(178,184));
+ //动力电池可用容量
+ vehicleMessage.setAvailableBatteryCapacity(content.substring(184,190));
+ //车辆状态
+ vehicleMessage.setVehicleStatus(Integer.valueOf(content.substring(190,191)));
+ //充电状态
+ vehicleMessage.setChargingStatus(Integer.valueOf(content.substring(191,192)));
+ //运行状态
+ vehicleMessage.setOperatingStatus(Integer.valueOf(content.substring(192,193)));
+ //Soc状态
+ vehicleMessage.setSocStatus(Integer.valueOf(content.substring(193,194))); //Soc状态
+ //可充电储能装置工作状态
+ vehicleMessage.setChargingEnergyStorageStatus(Integer.valueOf(content.substring(194,195)));
+ //驱动电机状态
+ vehicleMessage.setDriveMotorStatus(Integer.valueOf(content.substring(195,196)));
+ //定位是否有效
+ vehicleMessage.setPositionStatus(Integer.valueOf(content.substring(196,197)));
+ //EAS 电子防窃系统状态
+ vehicleMessage.setEasStatus(Integer.valueOf(content.substring(197,198)));
+ //PTC 传动系统状态
+ vehicleMessage.setPtcStatus(Integer.valueOf(content.substring(198,199)));
+ //EPS 蓄电池状态
+ vehicleMessage.setEpsStatus(Integer.valueOf(content.substring(199,200)));
+ //ABS 防滑刹车系统状态
+ vehicleMessage.setAbsStatus(Integer.valueOf(content.substring(200,201)));
+ //MCU 微处理器状态
+ vehicleMessage.setMcuStatus(Integer.valueOf(content.substring(201,202)));
+ //动力电池加热状态
+ vehicleMessage.setHeatingStatus(Integer.valueOf(content.substring(202,203)));
+ //动力电池当前状态
+ vehicleMessage.setBatteryStatus(Integer.valueOf(content.substring(203,204)));
+ //动力电池保温状态
+ vehicleMessage.setBatteryInsulationStatus(Integer.valueOf(content.substring(204,205)));
+ //DCDC 电力交换系统状态
+ vehicleMessage.setDcdcStatus(Integer.valueOf(content.substring(205,206)));
+ //CHG 充电机状态
+ vehicleMessage.setChgStatus(Integer.valueOf(content.substring(207,208)));
+
return vehicleMessage;
}
diff --git a/fate-modules-service/src/main/java/com/shiyi/analysis/handler/MessageHandlerRegular.java b/fate-modules-service/src/main/java/com/shiyi/analysis/handler/MessageHandlerRegular.java
new file mode 100644
index 0000000..3dbcc4f
--- /dev/null
+++ b/fate-modules-service/src/main/java/com/shiyi/analysis/handler/MessageHandlerRegular.java
@@ -0,0 +1,93 @@
+package com.shiyi.analysis.handler;
+
+import com.shiyi.analysis.constants.VehicleConstant;
+import lombok.extern.log4j.Log4j2;
+import org.apache.commons.lang3.StringUtils;
+
+/**
+ * @Description : 报文处理规则
+ * @Author : YangHaoYu
+ * @Date: 2023-11-27 09:33
+ */
+@Log4j2
+public class MessageHandlerRegular {
+
+ /**
+ * 校验位计算
+ * @param message
+ * @return
+ */
+ public static boolean checkHandlerRule(String message) {
+ if (!message.contains(VehicleConstant.DATA_PACK_SEPARATOR)){
+ return false;
+ }
+ String temp = message.replaceAll("#\\$&\\*", "");
+ if (message.indexOf(VehicleConstant.MSG_START) != 0){
+ return false;
+ }
+ if ((message.lastIndexOf(VehicleConstant.MSG_END)+2) != temp.length()){
+ return false;
+ }
+ String content = message.substring(2, message.length() - 9);
+ String data = content.replace(" ", "");
+ int dSum = 0;
+ int length = data.length();
+ int index = 0;
+ //遍历十六进制,并计算总合
+ while (index < length){
+ //截取2位字符
+ String hex = data.substring(index, index + 2);
+ //十六进制转成十进制 , 并计算十进制的总和
+ int i = Integer.parseInt(hex, 16);
+ dSum += i;
+ index += 2;
+ }
+ //用256取余,十六进制最大是FF,FF的十进制是255
+ int mod = dSum % 256;
+ //余数转成十六进制
+ String checkSumHex = Integer.toHexString(mod);
+ length = checkSumHex.length();
+ if (length < 2){
+ //校验位不足两位的,在前面补0
+ checkSumHex ="0" +checkSumHex;
+ }
+ String parityBit = message.substring(message.length() - 9, message.length() - 7);
+ return parityBit.equals(checkSumHex);
+ }
+
+
+ /**
+ * 将十六进制转为字符
+ * @param hexString
+ * @return
+ */
+ public static String theDecimal(String hexString){
+ String[] split = hexString.split(" ");
+ StringBuilder stringBuilder = new StringBuilder();
+ for (String info : split) {
+ if (StringUtils.isNotEmpty(info)){
+ String decimalTemp = valueToAscii(Integer.valueOf(info, 16).toString());
+ stringBuilder.append(decimalTemp);
+ }
+ }
+ return stringBuilder.toString();
+ }
+
+
+ /**
+ *将10进制数据转为字符
+ * @param value
+ * @return
+ */
+ public static String valueToAscii(String value){
+ StringBuilder builder = new StringBuilder();
+ String[] chars = value.split(",");
+ for (String info : chars) {
+ builder.append((char)Integer.parseInt(info));
+ }
+ return builder.toString();
+ }
+
+
+
+}
diff --git a/fate-modules-service/src/main/java/com/shiyi/analysis/hbase/config/HbaseConfig.java b/fate-modules-service/src/main/java/com/shiyi/analysis/hbase/config/HbaseConfig.java
index 2e79edd..ef8c304 100644
--- a/fate-modules-service/src/main/java/com/shiyi/analysis/hbase/config/HbaseConfig.java
+++ b/fate-modules-service/src/main/java/com/shiyi/analysis/hbase/config/HbaseConfig.java
@@ -29,4 +29,6 @@ public class HbaseConfig {
public void setConfMaps(Map confMaps) {
this.confMaps = confMaps;
}
+
+
}
diff --git a/fate-modules-service/src/main/java/com/shiyi/analysis/hbase/instance/HBaseInstance.java b/fate-modules-service/src/main/java/com/shiyi/analysis/hbase/instance/HBaseInstance.java
index 8387ea9..8f63a6f 100644
--- a/fate-modules-service/src/main/java/com/shiyi/analysis/hbase/instance/HBaseInstance.java
+++ b/fate-modules-service/src/main/java/com/shiyi/analysis/hbase/instance/HBaseInstance.java
@@ -1,15 +1,23 @@
package com.shiyi.analysis.hbase.instance;
+import com.fate.common.datasource.annotation.Slave;
+import com.shiyi.analysis.domain.VehicleMessage;
import com.shiyi.analysis.hbase.config.HbaseConfig;
import lombok.extern.log4j.Log4j2;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Admin;
-import org.apache.hadoop.hbase.client.Connection;
+//import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.context.annotation.Bean;
+import org.springframework.jdbc.datasource.embedded.EmbeddedDatabase;
+import org.springframework.jdbc.datasource.embedded.EmbeddedDatabaseBuilder;
+import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
+import java.io.IOException;
+import java.sql.*;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@@ -21,10 +29,141 @@ import java.util.concurrent.Executors;
*/
@Log4j2
@Component
+@Slave //TIDB数据源
public class HBaseInstance {
private ExecutorService pool = Executors.newScheduledThreadPool(20);
+ /**
+ * 定义TIDB连接信息
+ */
+ private static final String TIDBURL = "";
+
+ private static final String USERNAME = "root";
+
+ private static final String PASSWORD = "123456";
+
+
+ @KafkaListener (topics = "mqtt", groupId = "Fluxmq_consumer")
+ public void listen(ConsumerRecord record){
+ String message = new String(record.value());
+ StringBuilder builder = new StringBuilder();
+ String[] split = message.split(" ");
+ for (String s : split) {
+ int sh = Integer.parseInt(s, 16);
+ builder.append((char) sh);
+ }
+ VehicleMessage build = VehicleMessage.builder().build();
+ try (Connection connection = DriverManager.getConnection(TIDBURL, USERNAME, PASSWORD)){
+ String sql = """
+ INSERT INTO vehicle_message (
+ vin,
+ createTime,
+ longitude,
+ latitude,
+ speed,
+ mileage,
+ voltage,
+ current,
+ resistance,
+ gear,
+ accelerationPedal,
+ brakePedal,
+ fuelConsumptionRate,
+ motorControllerTemperature,
+ motorSpeed,
+ motorTorque,
+ motorTemperature,
+ motorVoltage,
+ motorCurrent,
+ remainingBattery,
+ maximumFeedbackPower,
+ maximumDischargePower,
+ selfCheckCounter,
+ totalBatteryCurrent,
+ totalBatteryVoltage,
+ singleBatteryMaxVoltage,
+ singleBatteryMinVoltage,
+ singleBatteryMaxTemperature,
+ singleBatteryMinTemperature,
+ availableBatteryCapacity,
+ vehicleStatus,
+ chargingStatus,
+ operatingStatus,
+ socStatus,
+ chargingEnergyStorageStatus,
+ driveMotorStatus,
+ positionStatus,
+ easStatus,
+ ptcStatus,
+ epsStatus,
+ absStatus,
+ mcuStatus,
+ heatingStatus,
+ batteryStatus,
+ batteryInsulationStatus,
+ dcdcStatus,
+ chgStatus) VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)""" ;
+ try (PreparedStatement preparedStatement = connection.prepareStatement (sql)) {
+ // 设置参数
+ preparedStatement.setString (1, build.getVin ());
+ preparedStatement.setDate (2, (Date) build.getCreateTime());
+ preparedStatement.setString (3, build.getLongitude ());
+ preparedStatement.setString (4, build.getLatitude ());
+ preparedStatement.setString (5, build.getSpeed ());
+ preparedStatement.setBigDecimal (6, build.getMileage ());
+ preparedStatement.setString (7, build.getVoltage ());
+ preparedStatement.setString (8, build.getCurrent ());
+ preparedStatement.setString (9, build.getResistance ());
+ preparedStatement.setString (10, build.getGear ());
+ preparedStatement.setString (11, build.getAccelerationPedal ());
+ preparedStatement.setString (12, build.getBrakePedal ());
+ preparedStatement.setString (13, build.getFuelConsumptionRate ());
+ preparedStatement.setString (14, build.getMotorControllerTemperature ());
+ preparedStatement.setString (15, build.getMotorSpeed ());
+ preparedStatement.setString (16, build.getMotorTorque ());
+ preparedStatement.setString (17, build.getMotorTemperature ());
+ preparedStatement.setString (18, build.getMotorVoltage ());
+ preparedStatement.setString (19, build.getMotorCurrent ());
+ preparedStatement.setBigDecimal (20, build.getRemainingBattery ());
+ preparedStatement.setString (21, build.getMaximumFeedbackPower ());
+ preparedStatement.setString (22, build.getMaximumDischargePower ());
+ preparedStatement.setString (23, build.getSelfCheckCounter ());
+ preparedStatement.setString (24, build.getTotalBatteryCurrent ());
+ preparedStatement.setString (25, build.getTotalBatteryVoltage ());
+ preparedStatement.setString (26, build.getSingleBatteryMaxVoltage ());
+ preparedStatement.setString (27, build.getSingleBatteryMinVoltage ());
+ preparedStatement.setString (28, build.getSingleBatteryMaxTemperature ());
+ preparedStatement.setString (29, build.getSingleBatteryMinTemperature ());
+ preparedStatement.setString (30, build.getAvailableBatteryCapacity ());
+ preparedStatement.setInt (31, build.getVehicleStatus ());
+ preparedStatement.setInt (32, build.getChargingStatus ());
+ preparedStatement.setInt (33, build.getOperatingStatus ());
+ preparedStatement.setInt (34, build.getSocStatus ());
+ preparedStatement.setInt (35, build.getChargingEnergyStorageStatus ());
+ preparedStatement.setInt (36, build.getDriveMotorStatus ());
+ preparedStatement.setInt (37, build.getPositionStatus ());
+ preparedStatement.setInt (38, build.getEasStatus ());
+ preparedStatement.setInt (39, build.getPtcStatus ());
+ preparedStatement.setInt (40, build.getEpsStatus ());
+ preparedStatement.setInt (41, build.getAbsStatus ());
+ preparedStatement.setInt (42, build.getMcuStatus ());
+ preparedStatement.setInt (43, build.getHeatingStatus ());
+ preparedStatement.setInt (44, build.getBatteryStatus ());
+ preparedStatement.setInt (45, build.getBatteryInsulationStatus ());
+ preparedStatement.setInt(46,build.getDcdcStatus());
+ preparedStatement.setInt(47,build.getChgStatus());
+
+ // 执行插入操作
+ preparedStatement.executeUpdate ();
+ }
+ }catch (SQLException e){
+ System.out.println(e.getMessage());
+ }
+ }
+
+
+
@Bean(name = "hbaseConnection")
public Connection initConnection(HbaseConfig hbaseConfig){
try {
@@ -34,21 +173,34 @@ public class HBaseInstance {
for (Map.Entry confEntry : confMaps.entrySet()) {
conf.set(confEntry.getKey(),confEntry.getValue());
}
- return ConnectionFactory.createConnection(conf,pool);
+ return null;
+ //return ConnectionFactory.createConnection(conf,pool);
}catch (Exception e){
log.error("连接HBase错误", e);
return null;
}
}
+ @Bean
+ public Connection connection() throws SQLException {
+ EmbeddedDatabase embeddedDatabase = new EmbeddedDatabaseBuilder().build();
+ return embeddedDatabase.getConnection();
+ }
@Bean(name = "hbaseAdmin")
public Admin initAdmin(Connection connection){
try {
- return connection.getAdmin();
+ return null;
+ //return connection.getAdmin();
}catch (Exception e){
log.error("创建HBase管理API出错",e);
return null;
}
}
+ @Bean
+ public Admin admin() throws IOException {
+ Configuration config = new Configuration();
+ config.set("hbase.zookeeper.quorum", "localhost");
+ return ConnectionFactory.createConnection(config).getAdmin();
+ }
}
diff --git a/fate-modules-service/src/main/java/com/shiyi/analysis/kafka/config/KafkaConfig.java b/fate-modules-service/src/main/java/com/shiyi/analysis/kafka/config/KafkaConfig.java
new file mode 100644
index 0000000..52765da
--- /dev/null
+++ b/fate-modules-service/src/main/java/com/shiyi/analysis/kafka/config/KafkaConfig.java
@@ -0,0 +1,28 @@
+package com.shiyi.analysis.kafka.config;
+
+import lombok.Data;
+import org.springframework.boot.context.properties.ConfigurationProperties;
+import org.springframework.context.annotation.Configuration;
+
+/**
+ * @Description : kafka服务配置
+ * @Author : YangHaoYu
+ * @Date: 2023-11-27 19:37
+ */
+@ConfigurationProperties(prefix = "kafka.config")
+@Configuration
+@Data
+public class KafkaConfig {
+
+ /**
+ * 服务地址
+ */
+ private String hosts;
+
+ /**
+ *topic
+ */
+ private String topic;
+
+
+}
diff --git a/fate-modules-service/src/main/java/com/shiyi/analysis/kafka/config/MQTTConfig.java b/fate-modules-service/src/main/java/com/shiyi/analysis/kafka/config/MQTTConfig.java
new file mode 100644
index 0000000..e2dca6e
--- /dev/null
+++ b/fate-modules-service/src/main/java/com/shiyi/analysis/kafka/config/MQTTConfig.java
@@ -0,0 +1,41 @@
+package com.shiyi.analysis.kafka.config;
+
+import org.eclipse.paho.client.mqttv3.MqttClient;
+import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
+import org.eclipse.paho.client.mqttv3.MqttException;
+import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+
+/**
+ * @Description : MQTT连接
+ * @Author : YangHaoYu
+ * @Date: 2023-11-28 16:03
+ */
+@Configuration
+public class MQTTConfig {
+
+ /**
+ *MQTT代理服务器地址和端口
+ */
+ private static final String BROKER_URL = "tcp://182.254.222.21:1883";
+
+
+ /**
+ *MQTT客户端ID
+ */
+ private static final String CLIENT_ID = "analysis";
+
+
+ @Bean
+ public MqttClient mqttClient() throws MqttException {
+ MqttClient mqttClient = new MqttClient(BROKER_URL, CLIENT_ID, new MemoryPersistence());
+ MqttConnectOptions options = new MqttConnectOptions();
+ options.setCleanSession(true);
+ mqttClient.connect(options);
+ System.out.println("mqtt开始连接");
+ return mqttClient;
+ }
+
+
+}
diff --git a/fate-modules-service/src/main/java/com/shiyi/analysis/kafka/exception/KafkaException.java b/fate-modules-service/src/main/java/com/shiyi/analysis/kafka/exception/KafkaException.java
new file mode 100644
index 0000000..366034a
--- /dev/null
+++ b/fate-modules-service/src/main/java/com/shiyi/analysis/kafka/exception/KafkaException.java
@@ -0,0 +1,17 @@
+package com.shiyi.analysis.kafka.exception;
+
+/**
+ * @Description : kafka异常类
+ * @Author : YangHaoYu
+ * @Date: 2023-11-27 19:41
+ */
+public class KafkaException extends RuntimeException {
+
+ public KafkaException(String message){
+ super(message);
+ }
+
+ public KafkaException(String message, Throwable cause){
+ super(message, cause);
+ }
+}
diff --git a/fate-modules-service/src/main/java/com/shiyi/analysis/kafka/instance/KafkaInstance.java b/fate-modules-service/src/main/java/com/shiyi/analysis/kafka/instance/KafkaInstance.java
new file mode 100644
index 0000000..79dc399
--- /dev/null
+++ b/fate-modules-service/src/main/java/com/shiyi/analysis/kafka/instance/KafkaInstance.java
@@ -0,0 +1,37 @@
+package com.shiyi.analysis.kafka.instance;
+
+import com.fasterxml.jackson.databind.ser.std.StringSerializer;
+import com.shiyi.analysis.kafka.config.KafkaConfig;
+import lombok.extern.log4j.Log4j2;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.springframework.context.annotation.Bean;
+import org.springframework.stereotype.Component;
+
+import java.util.Properties;
+
+/**
+ * @Description : KafKa生产者
+ * @Author : YangHaoYu
+ * @Date: 2023-11-27 19:44
+ */
+@Component
+@Log4j2
+public class KafkaInstance {
+
+ @Bean(name = "kafkaProducer")
+ public Producer createKafKa(KafkaConfig kafkaConfig) {
+ //创建 KafKa 生产者的配置对象
+ Properties properties = new Properties();
+ //给 KafKa 配置对象添加配置信息:bootstrap.servers
+ properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaConfig.getHosts());
+ //key,value 序列化(必须):key.serializer,value.serializer
+ properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+ properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class);
+ properties.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION,1);
+ //创建KafKa生产者对象
+ KafkaProducer kafkaProducer = new KafkaProducer<>(properties);
+ return kafkaProducer;
+ }
+}
diff --git a/fate-modules-service/src/main/java/com/shiyi/analysis/kafka/service/KafkaService.java b/fate-modules-service/src/main/java/com/shiyi/analysis/kafka/service/KafkaService.java
new file mode 100644
index 0000000..2decbab
--- /dev/null
+++ b/fate-modules-service/src/main/java/com/shiyi/analysis/kafka/service/KafkaService.java
@@ -0,0 +1,20 @@
+package com.shiyi.analysis.kafka.service;
+
+/**
+ * @Description :
+ * @Author : YangHaoYu
+ * @Date: 2023-11-27 19:56
+ */
+public interface KafkaService {
+ public String getTopic();
+
+ public void sendMessageTakeKey(String topic, String key, String message);
+
+ public void sendMessage(String topic, String message);
+
+ public default void sendMessage(String message){
+ sendMessage(getTopic(),message);
+ }
+
+ public void sendMessageByKey(String key, String message);
+}
diff --git a/fate-modules-service/src/main/java/com/shiyi/analysis/kafka/service/impl/KafkaServiceimpl.java b/fate-modules-service/src/main/java/com/shiyi/analysis/kafka/service/impl/KafkaServiceimpl.java
new file mode 100644
index 0000000..7f928ad
--- /dev/null
+++ b/fate-modules-service/src/main/java/com/shiyi/analysis/kafka/service/impl/KafkaServiceimpl.java
@@ -0,0 +1,97 @@
+package com.shiyi.analysis.kafka.service.impl;
+
+import com.shiyi.analysis.kafka.config.KafkaConfig;
+import com.shiyi.analysis.kafka.config.MQTTConfig;
+import com.shiyi.analysis.kafka.service.KafkaService;
+import lombok.extern.log4j.Log4j2;
+import org.apache.kafka.clients.KafkaClient;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.eclipse.paho.client.mqttv3.*;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+import javax.annotation.PostConstruct;
+import javax.annotation.Resource;
+
+/**
+ * @Description :
+ * @Author : YangHaoYu
+ * @Date: 2023-11-27 19:57
+ */
+@Service
+@Log4j2
+public class KafkaServiceimpl implements KafkaService {
+
+ @Autowired
+ private KafkaConfig kafkaConfig;
+
+ @Autowired
+ private KafkaClient kafkaClient;
+
+ @Autowired
+ private MQTTConfig mqttClient;
+
+
+ /**
+ * mqtt持续拉
+ */
+ @PostConstruct
+ public void mqttPush() throws MqttException {
+ MqttClient client = mqttClient.mqttClient();
+ client.subscribe("mqtt",1);
+ client.setCallback(new MqttCallback() {
+ /**
+ * 连接丢失
+ * @param throwable
+ */
+ @Override
+ public void connectionLost(Throwable throwable) {
+ }
+
+ /**
+ * 接收到消息
+ * @param s
+ * @param mqttMessage
+ * @throws Exception
+ */
+ @Override
+ public void messageArrived(String s, MqttMessage mqttMessage) throws Exception {
+ /**
+ * KafKa持续推
+ */
+
+ }
+
+ @Override
+ public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
+
+ }
+ });
+ }
+
+
+
+ @Resource(name = "kafkaProducer")
+ public Producer kafkaProducer;
+
+ @Override
+ public String getTopic() {
+ return kafkaConfig.getTopic();
+ }
+
+ @Override
+ public void sendMessageTakeKey(String topic, String key, String message) {
+ kafkaProducer.send(new ProducerRecord(topic,key,message));
+ }
+
+ @Override
+ public void sendMessage(String topic, String message) {
+ kafkaProducer.send(new ProducerRecord(topic,message));
+ }
+
+ @Override
+ public void sendMessageByKey(String key, String message) {
+ kafkaProducer.send(new ProducerRecord(kafkaConfig.getTopic(), key, message));
+ }
+}
diff --git a/fate-modules-service/src/main/java/com/shiyi/analysis/service/HbaseService.java b/fate-modules-service/src/main/java/com/shiyi/analysis/service/HbaseService.java
index b9a95e0..1a2ccb3 100644
--- a/fate-modules-service/src/main/java/com/shiyi/analysis/service/HbaseService.java
+++ b/fate-modules-service/src/main/java/com/shiyi/analysis/service/HbaseService.java
@@ -3,7 +3,9 @@ package com.shiyi.analysis.service;
import com.shiyi.analysis.domain.VehicleMessage;
import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
import java.util.Collection;
+import java.util.List;
/**
* @Description : 数据传输层
@@ -11,9 +13,9 @@ import java.util.Collection;
* @Date: 2023-11-22 20:08
*/
public interface HbaseService {
- Collection extends VehicleMessage> scanRange(String tablename, String start, String end);
+ Collection extends VehicleMessage> scanRange(String tablename, String start, String end) throws IOException, InvocationTargetException, IllegalAccessException;
- Collection extends VehicleMessage> scanRangeByStart(String tablename, String start);
+ Collection extends VehicleMessage> scanRangeByStart(String tablename, String start) throws IOException, InvocationTargetException, IllegalAccessException;
void insertRecordsForObj(String tableName, String row, String columnFamilys, VehicleMessage vehicleMessage) throws IOException;
@@ -21,4 +23,34 @@ public interface HbaseService {
boolean isisTableExists(String tableName) throws IOException;
void createTable(String tableName, String[] columnFamily) throws IOException;
+
+ /**
+ * 批量插入数据
+ * @param tableName 表名
+ * @param row 行名
+ * @param columnFamilys 列组名
+ * @param columns 列名
+ * @param values 值
+ * @throws IOException
+ */
+ public void insertRecords(String tableName, String row, String columnFamilys, String[] columns, String[] values)throws IOException;
+
+
+ public void insertOneRecord(String tableName, String row, String columnFamilys, String colums, String value)throws IOException;
+
+ public void deleteRow(String tableName, String rowKey) throws IOException;
+
+ public void deleteColumnFamily(String tableName, String rowKey, String columnFamily) throws IOException;
+
+ public void deleteColumn(String tableName, String rowKey, String columnFamily, String column) throws IOException;
+
+ public String selectRow(String tableName, String rowKey) throws IOException;
+
+ public String selectValue(String tableName, String rowKey, String clumnFamily, String column) throws IOException;
+
+ public List scanReportDataByRowKeyword(String tableName, String rowKeyword) throws IOException;
+
+ public void deleteTable(String tableName) throws IOException;
+
+ public void selectAll(String tableName) throws IOException;
}
diff --git a/fate-modules-service/src/main/java/com/shiyi/analysis/service/impl/HbaseServiceimpl.java b/fate-modules-service/src/main/java/com/shiyi/analysis/service/impl/HbaseServiceimpl.java
index e942fd4..a7144d1 100644
--- a/fate-modules-service/src/main/java/com/shiyi/analysis/service/impl/HbaseServiceimpl.java
+++ b/fate-modules-service/src/main/java/com/shiyi/analysis/service/impl/HbaseServiceimpl.java
@@ -3,24 +3,25 @@ package com.shiyi.analysis.service.impl;
import com.shiyi.analysis.BeanUtils;
import com.shiyi.analysis.domain.VehicleMessage;
import com.shiyi.analysis.service.HbaseService;
-import lombok.SneakyThrows;
import lombok.extern.log4j.Log4j2;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.*;
+import org.apache.hadoop.hbase.filter.CompareFilter;
+import org.apache.hadoop.hbase.filter.RowFilter;
+import org.apache.hadoop.hbase.filter.SubstringComparator;
import org.apache.hadoop.hbase.util.Bytes;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
+import javax.swing.*;
import java.io.IOException;
import java.lang.reflect.Field;
+import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.List;
+import java.util.*;
import java.util.stream.Collectors;
/**
@@ -32,15 +33,22 @@ import java.util.stream.Collectors;
@Service
public class HbaseServiceimpl implements HbaseService {
+
@Autowired
private Admin admin;
@Autowired
private Connection connection;
- @SneakyThrows
+ /**
+ * 范围查询
+ * @param tablename
+ * @param start
+ * @param end
+ * @return
+ */
@Override
- public Collection extends VehicleMessage> scanRange(String tablename, String start, String end) {
+ public Collection extends VehicleMessage> scanRange(String tablename, String start, String end) throws IOException, InvocationTargetException, IllegalAccessException {
Table table =connection.getTable(TableName.valueOf(tablename));
Scan scan = new Scan();
//设置起始和结束的rowKey,范围值扫描包前不包后,只能扫描到005不能扫描到006,没有设置范围将会扫描全局
@@ -68,9 +76,14 @@ public class HbaseServiceimpl implements HbaseService {
return messages;
}
- @SneakyThrows
+ /**
+ * 根据起始范围查询
+ * @param tablename
+ * @param start
+ * @return
+ */
@Override
- public Collection extends VehicleMessage> scanRangeByStart(String tablename, String start) {
+ public Collection extends VehicleMessage> scanRangeByStart(String tablename, String start) throws IOException, InvocationTargetException, IllegalAccessException {
Table table = connection.getTable(TableName.valueOf(tablename));
Scan scan = new Scan();
//设置起始和结束的rowkey,范围值扫描包前不包后:只能扫到0005不能扫到0006.没有设置范围将会全局扫描
@@ -98,6 +111,14 @@ public class HbaseServiceimpl implements HbaseService {
return messages;
}
+ /**
+ *插入记录(单行单列组-多列多值)
+ * @param tableName
+ * @param row
+ * @param columnFamilys
+ * @param vehicleMessage
+ * @throws IOException
+ */
@Override
public void insertRecordsForObj(String tableName, String row, String columnFamilys, VehicleMessage vehicleMessage) throws IOException {
TableName name = TableName.valueOf(String.valueOf(tableName));
@@ -116,12 +137,24 @@ public class HbaseServiceimpl implements HbaseService {
}
+ /**
+ * 判断表名是否存在
+ * @param tableName
+ * @return
+ * @throws IOException
+ */
@Override
public boolean isisTableExists(String tableName) throws IOException {
Admin admin = connection.getAdmin();
return admin.tableExists(TableName.valueOf(tableName));
}
+ /**
+ * 创建表
+ * @param tableName
+ * @param columnFamily
+ * @throws IOException
+ */
@Override
public void createTable(String tableName, String[] columnFamily) throws IOException {
TableName name = TableName.valueOf(tableName);
@@ -139,4 +172,141 @@ public class HbaseServiceimpl implements HbaseService {
admin.createTable(desc);
}
}
+
+ /**
+ * 批量插入数据
+ *
+ * @param tableName 表名
+ * @param row 行名
+ * @param columnFamilys 列组名
+ * @param columns 列名
+ * @param values 值
+ * @throws IOException
+ */
+ @Override
+ public void insertRecords(String tableName, String row, String columnFamilys, String[] columns, String[] values) throws IOException {
+ TableName name = TableName.valueOf(tableName);
+ Table table = connection.getTable(name);
+ Put put = new Put(Bytes.toBytes(row));
+ for (int i = 0; i < columns.length; i++) {
+ put.addColumn(Bytes.toBytes(columnFamilys), Bytes.toBytes(columns[i]), Bytes.toBytes(values[i]));
+ table.put(put);
+ }
+ }
+
+ @Override
+ public void insertOneRecord(String tableName, String row, String columnFamilys, String colums, String value) throws IOException {
+ TableName name = TableName.valueOf(tableName);
+ Table table = connection.getTable(name);
+ Put put = new Put(Bytes.toBytes(row));
+ put.addColumn(Bytes.toBytes(columnFamilys), Bytes.toBytes(colums), Bytes.toBytes(value));
+ table.put(put);
+ }
+
+ @Override
+ public void deleteRow(String tableName, String rowKey) throws IOException {
+ TableName name = TableName.valueOf(tableName);
+ Table table = connection.getTable(name);
+ Delete delete = new Delete(rowKey.getBytes());
+ table.delete(delete);
+ }
+
+ @Override
+ public void deleteColumnFamily(String tableName, String rowKey, String columnFamily) throws IOException {
+ TableName name = TableName.valueOf(tableName);
+ Table table = connection.getTable(name);
+ Delete delete = new Delete(rowKey.getBytes()).addFamily(Bytes.toBytes(columnFamily));
+ table.delete(delete);
+ }
+
+ @Override
+ public void deleteColumn(String tableName, String rowKey, String columnFamily, String column) throws IOException {
+ TableName name = TableName.valueOf(tableName);
+ Table table = connection.getTable(name);
+ Delete delete = new Delete(rowKey.getBytes()).addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes(column));
+ table.delete(delete);
+ }
+
+ @Override
+ public String selectRow(String tableName, String rowKey) throws IOException {
+ String record = "";
+ TableName name = TableName.valueOf(tableName);
+ Table table = connection.getTable(name);
+ Get get = new Get(rowKey.getBytes());
+ Result rs = table.get(get);
+ NavigableMap>> map = rs.getMap();
+ for (Cell cell : rs.rawCells()) {
+ StringBuffer stringBuffer = new StringBuffer()
+ .append(Bytes.toString(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength())).append("\t")
+ .append(Bytes.toString(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength())).append("\t")
+ .append(Bytes.toString(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength())).append("\t")
+ .append(Bytes.toString(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength())).append("\n");
+ String string = stringBuffer.toString();
+ record += string;
+ }
+ return record;
+ }
+
+ @Override
+ public String selectValue(String tableName, String rowKey, String clumnFamily, String column) throws IOException {
+ TableName name = TableName.valueOf(tableName);
+ Table table = connection.getTable(name);
+ Get get = new Get(rowKey.getBytes());
+ get.addColumn(Bytes.toBytes(clumnFamily), Bytes.toBytes(column));
+ Result result = table.get(get);
+ return Bytes.toString(result.value());
+ }
+
+ @Override
+ public List scanReportDataByRowKeyword(String tableName, String rowKeyword) throws IOException {
+ ArrayList