feat():1、开始存入kafka--topic--分区完成,可推入其他分区

2、学习spring下线执行事件
master
Saisai Liu 2024-06-18 18:21:28 +08:00
parent 7b5da28692
commit 86cccdeace
8 changed files with 7 additions and 532 deletions

View File

@ -27,7 +27,7 @@ public class Vehicle implements Serializable {
/** /**
* *
*/ */
private String timestamp; private String startTime;
/** /**
* *
*/ */

View File

@ -1,520 +0,0 @@
//package com.mobai.domian;
//
//
//import lombok.AllArgsConstructor;
//import lombok.Builder;
//import lombok.Data;
//import lombok.NoArgsConstructor;
//
//import java.math.BigDecimal;
//
//
//
///**
// * 报文解析
// * @author Mobai
// * @Classname VehicleData
// * @Description 车辆模拟数据对象
// * @Date 2021/8/5
// */
//@Data
//@Builder
//@NoArgsConstructor
//@AllArgsConstructor
//public class VehicleData {
// /**
// * VIN
// */
// private String vin;
// /**
// * 行驶路线
// */
// private String drivingRoute;
//
// /**
// * 经度
// */
// private String longitude;
//
// /**
// * 维度
// */
// private String latitude;
//
// /**
// * 速度
// */
// private String speed;
//
// /**
// * 里程
// */
// private BigDecimal mileage;
//
// /**
// * 总电压
// */
// private String voltage;
//
// /**
// * 总电流
// */
// private String current;
//
// /**
// * 绝缘电阻
// */
// private String resistance;
//
// /**
// * 档位
// */
// private String gear = "P";
//
// /**
// * 加速踏板行程值
// */
// private String accelerationPedal;
//
// /**
// * 制动踏板行程值
// */
// private String brakePedal;
//
// /**
// * 燃料消耗率
// */
// private String fuelConsumptionRate;
//
// /**
// * 电机控制器温度
// */
// private String motorControllerTemperature;
//
// /**
// * 电机转速
// */
// private String motorSpeed;
//
// /**
// * 电机转矩
// */
// private String motorTorque;
//
// /**
// * 电机温度
// */
// private String motorTemperature;
//
// /**
// * 电机电压
// */
// private String motorVoltage;
//
// /**
// * 电机电流
// */
// private String motorCurrent;
//
// /**
// * 动力电池剩余电量SOC
// */
// private BigDecimal remainingBattery;
//
// /**
// * 电池总容量
// */
// private BigDecimal batteryLevel;
//
// /**
// * 当前状态允许的最大反馈功率
// */
// private String maximumFeedbackPower;
//
// /**
// * 当前状态允许最大放电功率
// */
// private String maximumDischargePower;
//
// /**
// * BMS自检计数器
// */
// private String selfCheckCounter;
//
// /**
// * 动力电池充放电电流
// */
// private String totalBatteryCurrent;
//
// /**
// * 动力电池负载端总电压V3
// */
// private String totalBatteryVoltage;
//
// /**
// * 单次最大电压
// */
// private String singleBatteryMaxVoltage;
//
// /**
// * 单体电池最低电压
// */
// private String singleBatteryMinVoltage;
//
// /**
// * 单体电池最高温度
// */
// private String singleBatteryMaxTemperature;
//
// /**
// * 单体电池最低温度
// */
// private String singleBatteryMinTemperature;
//
// /**
// * 动力电池可用容量
// */
// private String availableBatteryCapacity;
//
// /**
// * 车辆状态
// */
// private int vehicleStatus = 1;
//
// /**
// * 充电状态
// */
// private int chargingStatus = 1;
//
// /**
// * 运行状态
// */
// private int operatingStatus = 1;
//
// /**
// * SOC
// */
// private int socStatus = 1;
//
// /**
// * 可充电储能装置工作状态
// */
// private int chargingEnergyStorageStatus = 1;
//
// /**
// * 驱动电机状态
// */
// private int driveMotorStatus = 1;
//
// /**
// * 定位是否有效
// */
// private int positionStatus = 1;
//
// /**
// * EAS(汽车防盗系统)状态
// */
// private int easStatus = 1;
//
// /**
// * PTC(电动加热器)状态
// */
// private int ptcStatus = 1;
//
// /**
// * EPS(电动助力系统)状态
// */
// private int epsStatus = 1;
//
// /**
// * ABS(防抱死)状态
// */
// private int absStatus = 1;
//
// /**
// * MCU(电机/逆变器)状态
// */
// private int mcuStatus = 1;
//
// /**
// * 动力电池加热状态
// */
// private int heatingStatus = 1;
//
// /**
// * 动力电池当前状态
// */
// private int batteryStatus = 1;
//
// /**
// * 动力电池保温状态
// */
// private int batteryInsulationStatus = 1;
//
// /**
// * DCDC(电力交换系统)状态
// */
// private int dcdcStatus = 1;
//
// /**
// * CHG(充电机)状态
// */
// private int chgStatus = 1;
//
// /**
// * 车辆状态 报文
// */
// private String vehicleStatusMsg;
// /**
// * 智能硬件 报文
// */
// private String smartHardwareMsg;
// /**
// * 电池报文
// */
// private String batteryMsg;
//
// public String getMsg(){
// //第一位VIN
// return vin +
// // 当前时间戳
// System.currentTimeMillis() +
// //第二位经度 longitude latitude
// getValue(longitude, 11) +
// //第三位维度 longitude latitude
// getValue(latitude, 10) +
// //车速
// getValue(speed, 6) +
// //总里程
// getValue(mileage == null ? "" : mileage.toString(), 11) +
// // 总电压
// getValue(voltage, 6) +
// //总电流
// getValue(current, 5) +
// //绝缘电阻 79 - 87
// getValue(resistance, 9) +
// //档位
// (gear == null ? "D" : gear) +
// // 加速踏板行程值
// getValue(accelerationPedal, 2) +
// // 制动踏板行程值
// getValue(brakePedal, 2) +
// // 燃料消耗率
// getValue(fuelConsumptionRate, 5) +
// //电机控制器温度
// getValue(motorControllerTemperature, 6) +
// //电机转速
// getValue(motorSpeed, 5) +
// //点击转矩
// getValue(motorTorque, 4) +
// //电机温度
// getValue(motorTemperature, 6) +
// //电机电压
// getValue(motorVoltage, 5) +
// //电机电流
// getValue(motorCurrent, 8) +
// //动力电池剩余电量SOC
// getValue(remainingBattery == null ? "" : remainingBattery.toString(), 6) +
// //当前状态允许的最大反馈功率
// getValue(maximumFeedbackPower, 6) +
// //当前状态允许最大放电功率
// getValue(maximumDischargePower, 6) +
// //BMS自检计数器
// getValue(selfCheckCounter, 2) +
// //动力电池充放电电流
// getValue(totalBatteryCurrent, 5) +
// //动力电池负载端总电压V3
// getValue(totalBatteryVoltage, 6) +
// //单次最大电压
// getValue(singleBatteryMaxVoltage, 4) +
// //单体电池最低电压
// getValue(singleBatteryMinVoltage, 4) +
// //单体电池最高温度
// getValue(singleBatteryMaxTemperature, 6) +
// //单体电池最低温度
// getValue(singleBatteryMinTemperature, 6) +
// //动力电池可用容量
// getValue(availableBatteryCapacity, 6) +
// //车辆状态
// vehicleStatus +
// //充电状态
// chargingStatus +
// //运行状态
// operatingStatus +
// //SOC
// socStatus +
// //可充电储能装置工作状态
// chargingEnergyStorageStatus +
// //驱动电机状态
// driveMotorStatus +
// //定位是否有效
// positionStatus +
// //EAS
// easStatus +
// //PTC
// ptcStatus +
// //EPS
// epsStatus +
// //ABS
// absStatus +
// //MCU
// mcuStatus +
// //动力电池加热状态
// heatingStatus +
// //动力电池当前状态
// batteryStatus +
// //动力电池保温状态
// batteryInsulationStatus +
// //DCDC
// dcdcStatus +
// //CHG
// chgStatus;
// }
//
// public String getValue(String val , int valLength){
// if(val == null){
// val = "";
// }
// int length = val.length();
// if (length > valLength){
// return val.substring( 0 , valLength);
// }
// val = val + "0".repeat(valLength - length);
// return val;
// }
//
// /**
// * 汽车对象构造企业VIN
// * @param vehicle 汽车对象
// * @return 汽车数据对象
// */
// public static VehicleData vehicleBuild (Vehicle vehicle) {
// return VehicleData.builder()
// .vin(vehicle.getVin())
// .gear("P")
// .remainingBattery(vehicle.getRemainingBattery())
// .batteryLevel(vehicle.getBatteryLevel())
// .mileage(vehicle.getTotalMileage())
// .vehicleStatus(1)
// .chargingStatus(1)
// .operatingStatus(1)
// .socStatus(1)
// .chargingEnergyStorageStatus(1)
// .driveMotorStatus(1)
// .positionStatus(1)
// .easStatus(1)
// .ptcStatus(1)
// .epsStatus(1)
// .absStatus(1)
// .mcuStatus(1)
// .heatingStatus(1)
// .batteryStatus(1)
// .batteryInsulationStatus(1)
// .dcdcStatus(1)
// .chgStatus(1)
// .build();
// }
//
// /**
// * 模拟基础项
// */
// public void imitateBase(){
// // 总电压
// this.voltage = genValue(110, 750);
// // 总电流
// this.current = genValue(3, 50);
// // 绝缘电阻
// this.resistance = genValue(0,30000);
// // 加速踏板行程值
// this.accelerationPedal = genValue(0, 10);
// // 制动踏板行程值
// this.brakePedal = genValue(0, 10);
// }
//
//
// /**
// * 模拟电机数据
// */
// public void imitateMotor(){
// // 电机控制器温度
// this.motorControllerTemperature = genValue(0, 100);
// // 电机转速
// this.motorSpeed = genValue(0, 99999);
// // 电机转矩
// this.motorTorque = genValue(0, 1000);
// // 电机温度
// this.motorTemperature = genValue(0, 150);
// // 电机电压
// this.motorVoltage = genValue(110, 300);
// // 电机电流
// this.motorCurrent = genValue(0, 15000);
// }
//
// /**
// * 模拟电池包数据
// */
// public void imitateBatteryPack(){
// // 当前状态允许的最大反馈功率
// this.maximumFeedbackPower = genValue(0, 100);
// // 当前状态允许最大放电功率
// this.maximumDischargePower = genValue(0, 100);
// // BMS自检计数器
// this.selfCheckCounter = genValue(0, 15);
// // 动力电池充放电电流
// this.totalBatteryCurrent = genValue(0, 15);
// // 动力电池负载端总电压V3
// this.totalBatteryVoltage = genValue(220, 750);
// // 单体电池最高电压
// this.singleBatteryMaxVoltage = genValue(3, 5);
// // 单体电池最低电压
// this.singleBatteryMinVoltage = genValue(3, 5);
// // 单体电池最高温度
// this.singleBatteryMaxTemperature = genValue(0, 100);
// // 单体电池最低温度
// this.singleBatteryMinTemperature = genValue(0, 100);
// // 动力电池可用容量
// this.availableBatteryCapacity = genValue(0,100 );
// }
// /**
// 车辆状态
// vehicleStatus;
// 充电状态
// chargingStatus;
// 运行状态
// operatingStatus;
// SOC
// socStatus;
// 可充电储能装置工作状态
// chargingEnergyStorageStatus;
// 驱动电机状态
// driveMotorStatus;
// 定位是否有效
// positionStatus;
// */
//
// /**
// EAS(汽车防盗系统)状态
// easStatus;
// PTC(电动加热器)状态
// ptcStatus;
// EPS(电动助力系统)状态
// epsStatus;
// ABS(防抱死)状态
// absStatus;
// MCU(电机/逆变器)状态
// mcuStatus;
// */
//
// /**
// 动力电池加热状态
// heatingStatus;
// 动力电池当前状态
// batteryStatus;
// 动力电池保温状态
// batteryInsulationStatus;
// DCDC(电力交换系统)状态
// dcdcStatus;
// CHG(充电机)状态
// chgStatus;
// */
//}

View File

@ -30,7 +30,6 @@ public class CustomPartitioner implements Partitioner {
synchronized (this) { synchronized (this) {
int chosenPartition = currentPartitionIndex; int chosenPartition = currentPartitionIndex;
currentPartitionIndex = (currentPartitionIndex + 1) % numPartitions; currentPartitionIndex = (currentPartitionIndex + 1) % numPartitions;
// 根据权重选择分区 // 根据权重选择分区
int cumulativeWeight = 0; int cumulativeWeight = 0;
for (int i = 0; i < numPartitions; i++) { for (int i = 0; i < numPartitions; i++) {
@ -40,7 +39,6 @@ public class CustomPartitioner implements Partitioner {
break; break;
} }
} }
System.out.println("当前Key:" + key + "-----> 当前Value:" + value + "----->" + "当前存储分区:" + chosenPartition); System.out.println("当前Key:" + key + "-----> 当前Value:" + value + "----->" + "当前存储分区:" + chosenPartition);
return chosenPartition; return chosenPartition;
} }

View File

@ -69,7 +69,7 @@ public class KafkaPCUtils {
vehicleString, vehicleString,
vehicleString.getBytes(), vehicleString.getBytes(),
new Cluster( new Cluster(
"iYl5vA6ESGaoH5veXYGroQ", "uqU0Vo8_TiaV2Xp0kzczaA",
list, list,
partitionInfos, partitionInfos,
new HashSet<String>() {}, new HashSet<String>() {},

View File

@ -9,7 +9,7 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
/** /**
* create.topic * create.topic
* @ClassName MessageHandler * @ClassName MessageHandler
* @Description * @Description
* @Author Mobai * @Author Mobai

View File

@ -56,7 +56,7 @@ public class MqttCallBackServiceImpl implements MqttCallback {
String trainMsg = str.substring(1, str.length() - 2); String trainMsg = str.substring(1, str.length() - 2);
// 解析报文具体数据 // 解析报文具体数据
String vin = trainMsg.substring(0, 17); // 车辆vin String vin = trainMsg.substring(0, 17); // 车辆vin
String timestamp = trainMsg.substring(17, 30); // 时间戳 String startTime = trainMsg.substring(17, 30); // 时间戳
String longitude = trainMsg.substring(30, 41); // 经度 String longitude = trainMsg.substring(30, 41); // 经度
String latitude = trainMsg.substring(41, 51); // 纬度 String latitude = trainMsg.substring(41, 51); // 纬度
String speed = trainMsg.substring(51, 57); // 速度 String speed = trainMsg.substring(51, 57); // 速度
@ -105,7 +105,7 @@ public class MqttCallBackServiceImpl implements MqttCallback {
Integer chgStatus = Integer.valueOf(trainMsg.substring(204, 205)); // CHG(充电机)状态 Integer chgStatus = Integer.valueOf(trainMsg.substring(204, 205)); // CHG(充电机)状态
Vehicle vehicle = new Vehicle(vin, Vehicle vehicle = new Vehicle(vin,
timestamp, startTime,
longitude, longitude,
latitude, latitude,
speed, speed,
@ -152,7 +152,7 @@ public class MqttCallBackServiceImpl implements MqttCallback {
chgStatus); chgStatus);
System.out.println( System.out.println(
"车辆vin:" + vin + "\n\t" + "车辆vin:" + vin + "\n\t" +
"时间戳:" + timestamp + "\n\t" + "时间戳:" + startTime + "\n\t" +
"经度:" + longitude + "\n\t" + "经度:" + longitude + "\n\t" +
"纬度:" + latitude + "\n\t" + "纬度:" + latitude + "\n\t" +
"速度:" + speed + "\n\t" + "速度:" + speed + "\n\t" +

View File

@ -20,9 +20,6 @@ import javax.servlet.ServletException;
@Component @Component
public class ConnectMqtt { public class ConnectMqtt {
@Autowired
private RedisService redisService;
public void connectMq(MqttServerModel mqttServerModel) throws ServletException { public void connectMq(MqttServerModel mqttServerModel) throws ServletException {
if (mqttServerModel == null) { if (mqttServerModel == null) {
throw new ServletException("连接参数不能为空"); throw new ServletException("连接参数不能为空");

View File

@ -4,7 +4,7 @@ spring:
application: application:
name: mobai-mq name: mobai-mq
rabbitmq: rabbitmq:
host: 43.142.100.73 host: 175.24.138.82
stream: stream:
username: guest username: guest
password: guest password: guest