feat: 新增mqtt客户端监听服务器节点,获取报文并解析

master
yaoxin 2024-06-05 22:24:22 +08:00
parent e5c26727ac
commit 8382839122
6 changed files with 515 additions and 25 deletions

View File

@ -0,0 +1,36 @@
package com.muyu.mqttmessage.common;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.experimental.SuperBuilder;
import java.util.UUID;
/**
* @ClassName MqttMessageInfo
* @Description
* @Author Xin.Yao
* @Date 2024/5/30 8:41
*/
@Data
@SuperBuilder
@AllArgsConstructor
@NoArgsConstructor
public class MqttMessageModel {
private String broker;
private String topic;
private String username;
private String password;
private String clientId;
public static MqttMessageModel builderMqttMessage(String broker, String topic,String username,String password) {
return MqttMessageModel.builder()
.broker(broker)
.topic(topic)
.clientId(UUID.randomUUID().toString())
.username(username)
.password(password)
.build();
}
}

View File

@ -1,11 +1,9 @@
package com.muyu.mqttmessage.config;
import com.muyu.loadcenter.common.MqttMessageModel;
import com.muyu.loadcenter.service.impl.MqttCallBackServiceImpl;
import com.muyu.mqttmessage.common.MqttMessageModel;
import com.muyu.mqttmessage.service.impl.MqttCallBackServiceImpl;
import lombok.AllArgsConstructor;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.springframework.stereotype.Service;
@ -19,19 +17,67 @@ import org.springframework.stereotype.Service;
@AllArgsConstructor
public class MqttFactory {
private final MqttCallBackServiceImpl mqttCallBackService;
public MqttClient createMqttClient(MqttMessageModel mqttMessageModel) {
private static MqttCallBackServiceImpl mqttCallBackService;
// public static void main(String[] args){
// String broker = "tcp://43.142.44.217:1883";
// String topic = "mqtt001";
// String username = "emqx";
// String password = "public";
// String clientid = "subscribe_client";
// int qos = 0;
//
// try {
// MqttClient client = new MqttClient(broker, clientid, new MemoryPersistence());
// // 连接参数
// MqttConnectOptions options = new MqttConnectOptions();
// options.setUserName(username);
// options.setPassword(password.toCharArray());
// options.setConnectionTimeout(60);
// options.setKeepAliveInterval(60);
// // 设置回调
// client.setCallback(new MqttCallback() {
//
// public void connectionLost(Throwable cause) {
// System.out.println("connectionLost: " + cause.getMessage());
// }
//
// public void messageArrived(String topic, MqttMessage message) {
// System.out.println("topic: " + topic);
// System.out.println("Qos: " + message.getQos());
// System.out.println("message content: " + new String(message.getPayload()));
//
// }
//
// public void deliveryComplete(IMqttDeliveryToken token) {
// System.out.println("deliveryComplete---------" + token.isComplete());
// }
//
// });
// client.connect(options);
// client.subscribe(topic, qos);
// } catch (Exception e) {
// e.printStackTrace();
// }
// }
public static void main(String[] args) {
MqttMessageModel mqttMessageModel1 = MqttMessageModel.builderMqttMessage("tcp://43.142.44.217:1883", "mqtt001","1111","22222");
MqttFactory.createMqttClient(mqttMessageModel1);
MqttMessageModel mqttMessageModel2 = MqttMessageModel.builderMqttMessage("tcp://47.98.170.220:1883", "mqtt002","1111","22222");
MqttFactory.createMqttClient(mqttMessageModel2);
}
public static MqttClient createMqttClient(MqttMessageModel mqttMessageModel) {
MqttClient client =null;
int qos = 0;
try {
client = new MqttClient(mqttMessageModel.getBroker(), mqttMessageModel.getClientid(), new MemoryPersistence());
client = new MqttClient(mqttMessageModel.getBroker(), mqttMessageModel.getClientId(), new MemoryPersistence());
// 连接参数
MqttConnectOptions options = new MqttConnectOptions();
options.setUserName(mqttMessageModel.getUsername());
options.setPassword(mqttMessageModel.getPassword().toCharArray());
options.setConnectionTimeout(60);
options.setKeepAliveInterval(60);
client.setCallback(mqttCallBackService);
client.setCallback(new MqttCallBackServiceImpl());
client.connect(options);
client.subscribe(mqttMessageModel.getTopic(), qos);
} catch (MqttException e) {

View File

@ -0,0 +1,261 @@
package com.muyu.mqttmessage.domain;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.experimental.SuperBuilder;
import java.math.BigDecimal;
/**
* @ClassName VehicleData
* @Description
* @Author Xin.Yao
* @Date 2024/6/5 6:52
*/
@Data
@SuperBuilder
@AllArgsConstructor
@NoArgsConstructor
public class VehicleData {
/**
* VIN
*/
private String vin;
/**
* 线
*/
private String drivingRoute;
/**
*
*/
private String longitude;
/**
*
*/
private String latitude;
/**
*
*/
private String speed;
/**
*
*/
private BigDecimal mileage;
/**
*
*/
private String voltage;
/**
*
*/
private String current;
/**
*
*/
private String resistance;
/**
*
*/
private String gear = "P";
/**
*
*/
private String accelerationPedal;
/**
*
*/
private String brakePedal;
/**
*
*/
private String fuelConsumptionRate;
/**
*
*/
private String motorControllerTemperature;
/**
*
*/
private String motorSpeed;
/**
*
*/
private String motorTorque;
/**
*
*/
private String motorTemperature;
/**
*
*/
private String motorVoltage;
/**
*
*/
private String motorCurrent;
/**
* SOC
*/
private BigDecimal remainingBattery;
/**
*
*/
private BigDecimal batteryLevel;
/**
*
*/
private String maximumFeedbackPower;
/**
*
*/
private String maximumDischargePower;
/**
* BMS
*/
private String selfCheckCounter;
/**
*
*/
private String totalBatteryCurrent;
/**
* V3
*/
private String totalBatteryVoltage;
/**
*
*/
private String singleBatteryMaxVoltage;
/**
*
*/
private String singleBatteryMinVoltage;
/**
*
*/
private String singleBatteryMaxTemperature;
/**
*
*/
private String singleBatteryMinTemperature;
/**
*
*/
private String availableBatteryCapacity;
/**
*
*/
private int vehicleStatus = 1;
/**
*
*/
private int chargingStatus = 1;
/**
*
*/
private int operatingStatus = 1;
/**
* SOC
*/
private int socStatus = 1;
/**
*
*/
private int chargingEnergyStorageStatus = 1;
/**
*
*/
private int driveMotorStatus = 1;
/**
*
*/
private int positionStatus = 1;
/**
* EAS()
*/
private int easStatus = 1;
/**
* PTC()
*/
private int ptcStatus = 1;
/**
* EPS()
*/
private int epsStatus = 1;
/**
* ABS()
*/
private int absStatus = 1;
/**
* MCU(/)
*/
private int mcuStatus = 1;
/**
*
*/
private int heatingStatus = 1;
/**
*
*/
private int batteryStatus = 1;
/**
*
*/
private int batteryInsulationStatus = 1;
/**
* DCDC()
*/
private int dcdcStatus = 1;
/**
* CHG()
*/
private int chgStatus = 1;
}

View File

@ -1,10 +1,16 @@
package com.muyu.mqttmessage.service.impl;
import com.alibaba.fastjson2.JSON;
import com.muyu.mqttmessage.domain.VehicleData;
import com.muyu.mqttmessage.utils.ConversionUtil;
import lombok.extern.log4j.Log4j2;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.springframework.stereotype.Component;
import java.math.BigDecimal;
/**
* @ClassName MqttCallBackConfig
* @Description
@ -12,6 +18,7 @@ import org.springframework.stereotype.Component;
* @Date 2024/5/30 8:36
*/
@Component
@Log4j2
public class MqttCallBackServiceImpl implements MqttCallback {
@Override
public void connectionLost(Throwable cause) {
@ -20,13 +27,125 @@ public class MqttCallBackServiceImpl implements MqttCallback {
@Override
public void messageArrived(String topic, MqttMessage message) {
System.out.println("topic: " + topic);
System.out.println("Qos: " + message.getQos());
System.out.println("message content: " + new String(message.getPayload()));
log.info("服务器{}监听的报文: {}" ,topic, ConversionUtil.hexStringToString(new String(message.getPayload())));
log.info("转化对象:{}", JSON.toJSONString(getVehicleData(ConversionUtil.hexStringToString(new String(message.getPayload())))));
}
@Override
public void deliveryComplete(IMqttDeliveryToken token) {
System.out.println("deliveryComplete---------" + token.isComplete());
}
public VehicleData getVehicleData(String message) {
message = message.substring(1,message.length()-2);
return VehicleData.builder()
//17
.vin(message.substring(0,17))
// 当前时间戳 13
.drivingRoute(message.substring(17,30))
//第二位经度 longitude latitude 11
.longitude(message.substring(30,41))
//第三位维度 longitude latitude 10
.latitude(message.substring(41,51))
//车速 6
.speed(removeSuperfluousDigit(message.substring(51,57)))
//总里程 11
.mileage(BigDecimal.valueOf(Double.valueOf(message.substring(57,68))))
// 总电压 6
.voltage(removeSuperfluousDigit(message.substring(68,74)))
//总电流 5
.current(removeSuperfluousDigit(message.substring(74,79)))
//绝缘电阻 79 - 87 9
.resistance(removeSuperfluousDigit(message.substring(79,88)))
//档位
.gear(message.substring(88,89))
// 加速踏板行程值 2
.accelerationPedal(removeSuperfluousDigit(message.substring(89,91)))
// 制动踏板行程值 2
.brakePedal(removeSuperfluousDigit(message.substring(91,93)))
// 燃料消耗率 5
.fuelConsumptionRate(removeSuperfluousDigit(message.substring(93,98)))
//电机控制器温度 6
.motorControllerTemperature(removeSuperfluousDigit(message.substring(98,104)))
//电机转速 5
.motorSpeed(removeSuperfluousDigit(message.substring(104,109)))
//点击转矩 4
.motorTorque(removeSuperfluousDigit(message.substring(109,113)))
//电机温度 6
.motorTemperature(removeSuperfluousDigit(message.substring(113,119)))
//电机电压 5
.motorVoltage(removeSuperfluousDigit(message.substring(119,124)))
//电机电流 8
.motorCurrent(removeSuperfluousDigit(message.substring(124,132)))
//动力电池剩余电量SOC 6
.remainingBattery(BigDecimal.valueOf(Double.valueOf(message.substring(132,138))))
//当前状态允许的最大反馈功率 6
.maximumFeedbackPower(removeSuperfluousDigit(message.substring(138,144)))
//当前状态允许最大放电功率 6
.maximumDischargePower(removeSuperfluousDigit(message.substring(144,150)))
//BMS自检计数器 2
.selfCheckCounter(removeSuperfluousDigit(message.substring(150,152)))
//动力电池充放电电流 5
.totalBatteryCurrent(removeSuperfluousDigit(message.substring(152,157)))
//动力电池负载端总电压V3 6
.totalBatteryVoltage(removeSuperfluousDigit(message.substring(157,163)))
//单次最大电压 4
.singleBatteryMaxVoltage(removeSuperfluousDigit(message.substring(163,167)))
//单体电池最低电压 4
.singleBatteryMinVoltage(removeSuperfluousDigit(message.substring(167,171)))
//单体电池最高温度 4
.singleBatteryMaxTemperature(removeSuperfluousDigit(message.substring(171,177)))
//单体电池最低温度 6
.singleBatteryMinTemperature(removeSuperfluousDigit(message.substring(177,183)))
//动力电池可用容量 6
.availableBatteryCapacity(removeSuperfluousDigit(message.substring(183,189)))
//车辆状态
.vehicleStatus(Integer.valueOf(message.substring(189,190)))
//充电状态
.chargingStatus(Integer.valueOf(message.substring(190,191)))
//运行状态
.operatingStatus(Integer.valueOf(message.substring(191,192)))
//SOC
.socStatus(Integer.valueOf(message.substring(192,193)))
//可充电储能装置工作状态
.chargingEnergyStorageStatus(Integer.valueOf(message.substring(193,194)))
//驱动电机状态
.driveMotorStatus(Integer.valueOf(message.substring(194,195)))
//定位是否有效
.positionStatus(Integer.valueOf(message.substring(195,196)))
//EAS
.easStatus(Integer.valueOf(message.substring(196,197)))
//PTC
.ptcStatus(Integer.valueOf(message.substring(197,198)))
//EPS
.epsStatus(Integer.valueOf(message.substring(198,199)))
//ABS
.absStatus(Integer.valueOf(message.substring(199,200)))
//MCU
.mcuStatus(Integer.valueOf(message.substring(200,201)))
//动力电池加热状态
.heatingStatus(Integer.valueOf(message.substring(201,202)))
//动力电池当前状态
.batteryStatus(Integer.valueOf(message.substring(202,203)))
//动力电池保温状态
.batteryInsulationStatus(Integer.valueOf(message.substring(203,204)))
//DCDC
.dcdcStatus(Integer.valueOf(message.substring(204,205)))
//CHG
.chgStatus(Integer.valueOf(message.substring(205,206)))
.build();
}
public String removeSuperfluousDigit(String str){
if(str.length()>1){
if(str.charAt(0)=='0'){
return removeSuperfluousDigit(str.substring(1));
}else{
return str;
}
}else{
return str;
}
}
}

View File

@ -0,0 +1,41 @@
package com.muyu.mqttmessage.utils;
import org.springframework.stereotype.Component;
import java.nio.charset.StandardCharsets;
/**
* @ClassName ConversionUtil
* @Description
* @Author Xin.Yao
* @Date 2024/6/5 6:32
*/
@Component
public class ConversionUtil {
/**
* 16
* @param hexString
* @return hexString
*/
public static String hexStringToString(String hexString){
if (hexString == null || "".equals(hexString)){
return null;
}
hexString = hexString.replace(" ","");
byte[] bytes = new byte[hexString.length() / 2];
for (int i = 0; i < bytes.length; i++){
try {
bytes[i] = (byte) (0xff & Integer.parseInt(hexString.substring(i * 2, i * 2 + 2), 16));
}catch (Exception e){
e.printStackTrace();
}
}
try{
hexString = new String(bytes, StandardCharsets.UTF_8);
}catch (Exception e){
e.printStackTrace();
}
return hexString;
}
}

View File

@ -1,13 +0,0 @@
package com.muyu.mqttmessage;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;
@SpringBootTest
class MqttMessageApplicationTests {
@Test
void contextLoads() {
}
}