分流解析+存储完成

Signed-off-by: fuck_zhn <2218834824@qq.com>
master
张海宁 2023-11-28 14:36:51 +08:00
parent 243e3faecf
commit fc96bf8412
10 changed files with 574 additions and 89 deletions

View File

@ -5,12 +5,13 @@ import org.springframework.boot.autoconfigure.SpringBootApplication;
/** /**
* @author : Administrator * @author : Administrator
* @Description : mqtt * @Description :
*/ */
@SpringBootApplication @SpringBootApplication
public class MqttDecodeMain { public class ShuntModuleMain {
public static void main (String[] args) { public static void main (String[] args) {
SpringApplication.run (MqttDecodeMain.class, args); SpringApplication.run (ShuntModuleMain.class, args);
} }
} }

View File

@ -8,10 +8,8 @@ import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka; import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.*; import org.springframework.kafka.core.*;
import org.springframework.kafka.listener.ContainerProperties;
import org.springframework.kafka.listener.ContainerProperties.AckMode;
import org.springframework.kafka.listener.KafkaMessageListenerContainer;
import org.springframework.kafka.support.serializer.JsonDeserializer; import org.springframework.kafka.support.serializer.JsonDeserializer;
import org.springframework.kafka.support.serializer.JsonSerializer; import org.springframework.kafka.support.serializer.JsonSerializer;
@ -22,49 +20,68 @@ import java.util.Map;
@EnableKafka @EnableKafka
public class KafkaConfig { public class KafkaConfig {
@Value ("${spring.kafka.bootstrap-servers}") public static final String KAFKA_ADMIN_CLIENT_ID = "mqtt_to_kafka";
@Value ("${kafka.bootstrap-servers}")
private String bootstrapServers; private String bootstrapServers;
@Value ("${kafka.consumer.group-id}")
@Value ("${spring.kafka.consumer.group-id}")
private String groupId; private String groupId;
/**
* KafkaTemplate
*
* @return KafkaTemplate
*/
@Bean @Bean
public KafkaAdmin kafkaAdmin () { public KafkaTemplate<String, Object> kafkaTemplate () {
Map<String, Object> configs = new HashMap<> (); // 返回新的 KafkaTemplate 对象
configs.put (ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); ProducerFactory<String, Object> prop = producerFactory ();
return new KafkaAdmin (configs); return new KafkaTemplate<> (prop);
} }
/**
*
*
* @return ProducerFactory
*/
@Bean @Bean
public ProducerFactory<String, Object> producerFactory () { public ProducerFactory<String, Object> producerFactory () {
// 创建 Kafka 生产者配置
Map<String, Object> configs = new HashMap<> (); Map<String, Object> configs = new HashMap<> ();
configs.put (ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); configs.put (ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
configs.put (ProducerConfig.CLIENT_ID_CONFIG, KAFKA_ADMIN_CLIENT_ID);
configs.put (ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); configs.put (ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configs.put (ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class); configs.put (ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
// 增加 max.poll.interval.ms 配置
configs.put (ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 300000); // 设置为 5 分钟(单位为毫秒)
// 减小 max.poll.records 配置
configs.put (ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 100); // 设置每次 poll 返回的最大记录数为 100 条
// 返回新的 DefaultKafkaProducerFactory 对象
return new DefaultKafkaProducerFactory<> (configs); return new DefaultKafkaProducerFactory<> (configs);
} }
@Bean @Bean
public KafkaTemplate<String, Object> kafkaTemplate (ProducerFactory<String, Object> producerFactory) { public ConcurrentKafkaListenerContainerFactory<String, Object> kafkaListenerContainerFactory () {
return new KafkaTemplate<> (producerFactory); ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory<> ();
} factory.setConsumerFactory (consumerFactory ());
return factory;
@Bean
public KafkaMessageListenerContainer<String, Object> messageListenerContainer () {
ContainerProperties containerProps = new ContainerProperties ("my_topic_name");
containerProps.setAckMode (AckMode.MANUAL_IMMEDIATE);
containerProps.setMessageListener (new KafkaMessageListener ());
return new KafkaMessageListenerContainer<> (consumerFactory (), containerProps);
} }
/**
*
*
* @return ConsumerFactory
*/
@Bean @Bean
public ConsumerFactory<String, Object> consumerFactory () { public ConsumerFactory<String, Object> consumerFactory () {
// 创建 Kafka 消费者配置
Map<String, Object> props = new HashMap<> (); Map<String, Object> props = new HashMap<> ();
props.put (ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); props.put (ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put (ConsumerConfig.GROUP_ID_CONFIG, groupId); props.put (ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.put (ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put (ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put (ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class); props.put (ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
props.put (JsonDeserializer.TRUSTED_PACKAGES, "*"); props.put (JsonDeserializer.TRUSTED_PACKAGES, "*");
// 返回新的 DefaultKafkaConsumerFactory 对象
return new DefaultKafkaConsumerFactory<> (props); return new DefaultKafkaConsumerFactory<> (props);
} }

View File

@ -1,29 +0,0 @@
package com.zhn.decode.config;
/**
* @author : Administrator
* @description : kafka
*/
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.listener.MessageListener;
public class KafkaMessageListener implements MessageListener<String, Object> {
/**
*
*
* @param record
*/
@Override
public void onMessage (ConsumerRecord<String, Object> record) {
// 获取消息的键
String key = record.key ();
// 获取消息的值
Object value = record.value ();
// 在这里处理接收到的消息
}
}

View File

@ -14,15 +14,6 @@ import org.springframework.context.annotation.Configuration;
*/ */
@Configuration @Configuration
public class MqttConfig { public class MqttConfig {
/**
* Kafka
*/
public static final String BOOTSTRAP_SERVERS = "123.207.204.152:9092";
/**
* Kafka
*/
public static final String KAFKA_TOPIC = "test";
/** /**
* MQTT * MQTT
@ -32,12 +23,12 @@ public class MqttConfig {
/** /**
* MQTTID * MQTTID
*/ */
private static final String CLIENT_ID = "mqttx_049cd728"; private static final String CLIENT_ID = "mqttx_12345";
/** /**
* MQTT * MQTT
*/ */
private static final String TOPIC = "test";
/** /**
* MQTT * MQTT
@ -48,11 +39,13 @@ public class MqttConfig {
*/ */
@Bean @Bean
public MqttClient mqttClient () throws MqttException { public MqttClient mqttClient () throws MqttException {
MqttClient client = new MqttClient (BROKER, CLIENT_ID, new MemoryPersistence ()); MqttClient client = new MqttClient (BROKER, CLIENT_ID, new MemoryPersistence ());
MqttConnectOptions options = new MqttConnectOptions (); MqttConnectOptions options = new MqttConnectOptions ();
options.setCleanSession (true); options.setCleanSession (true);
client.connect (options); client.connect (options);
client.subscribe (TOPIC, 1); System.out.println ("开始连接");
return client; return client;
} }
} }

View File

@ -0,0 +1,289 @@
package com.zhn.decode.domain;
import lombok.*;
import java.math.BigDecimal;
import java.util.Date;
/**
* @author : Administrator
* @description :
*/
@Data
@AllArgsConstructor
@NoArgsConstructor
@ToString
@Builder
public class VehicleData {
/**
* VIN
*/
private String vin;
/**
*
*/
private Date createTime;
/**
*
*/
private String longitude;
/**
*
*/
private String latitude;
/**
*
*/
private String speed;
/**
*
*/
private BigDecimal mileage;
/**
*
*/
private String voltage;
//总电流
private String current;
//绝缘电阻
private String resistance;
//挡位
private String gear;
/**
*
*/
private String accelerationPedal;
/**
*
*/
private String brakePedal;
/**
*
*/
private String fuelConsumptionRate;
/**
*
*/
private String motorControllerTemperature;
/**
*
*/
private String motorSpeed;
/**
*
*/
private String motorTorque;
/**
*
*/
private String motorTemperature;
/**
*
*/
private String motorVoltage;
/**
*
*/
private String motorCurrent;
/**
* SOC
*/
private BigDecimal remainingBattery;
/**
*
*/
private String maximumFeedbackPower;
/**
*
*/
private String maximumDischargePower;
/**
* BMS
*/
private String selfCheckCounter;
/**
*
*/
private String totalBatteryCurrent;
/**
* V3
*/
private String totalBatteryVoltage;
/**
*
*/
private String singleBatteryMaxVoltage;
/**
*
*/
private String singleBatteryMinVoltage;
/**
*
*/
private String singleBatteryMaxTemperature;
/**
*
*/
private String singleBatteryMinTemperature;
/**
*
*/
private String availableBatteryCapacity;
/**
*
*/
private int vehicleStatus;
/**
*
*/
private int chargingStatus;
/**
*
*/
private int operatingStatus;
/**
* SOC
*/
private int socStatus;
/**
*
*/
private int chargingEnergyStorageStatus;
/**
*
*/
private int driveMotorStatus;
/**
*
*/
private int positionStatus;
/**
* EAS()
*/
private int easStatus;
/**
* PTC()
*/
private int ptcStatus;
/**
* EPS()
*/
private int epsStatus;
/**
* ABS()
*/
private int absStatus;
/**
* MCU(/)
*/
private int mcuStatus;
/**
*
*/
private int heatingStatus;
/**
*
*/
private int batteryStatus;
/**
*
*/
private int batteryInsulationStatus;
public static VehicleData getBuild (String messages) {
char start = messages.charAt (0);
char end = messages.charAt (messages.length () - 1);
System.out.println (start);
System.out.println (end);
return VehicleData.builder ()
.vin (messages.substring (1, 18))
//messages.substring(18, 31)
.createTime (new Date ())
.longitude (messages.substring (31, 42))
.latitude (messages.substring (42, 52))
.speed (messages.substring (52, 58))
.mileage (new BigDecimal (messages.substring (58, 69)))
.voltage (messages.substring (69, 75))
.current (messages.substring (75, 80))
.resistance (messages.substring (80, 89))
.gear (messages.substring (89, 90))
.accelerationPedal (messages.substring (90, 92))
.brakePedal (messages.substring (92, 94))
.fuelConsumptionRate (messages.substring (94, 99))
.motorControllerTemperature (messages.substring (99, 105))
.motorSpeed (messages.substring (105, 110))
.motorTorque (messages.substring (110, 114))
.motorTemperature (messages.substring (114, 120))
.motorVoltage (messages.substring (120, 125))
.motorCurrent (messages.substring (125, 133))
.remainingBattery (new BigDecimal (messages.substring (133, 139)))
.maximumFeedbackPower (messages.substring (139, 145))
.maximumDischargePower (messages.substring (145, 151))
.selfCheckCounter (messages.substring (151, 153))
.totalBatteryCurrent (messages.substring (153, 158))
.totalBatteryVoltage (messages.substring (158, 164))
.singleBatteryMaxVoltage (messages.substring (164, 168))
.singleBatteryMinVoltage (messages.substring (168, 172))
.singleBatteryMaxTemperature (messages.substring (172, 178))
.singleBatteryMinTemperature (messages.substring (178, 184))
.availableBatteryCapacity (messages.substring (184, 190))
.vehicleStatus (Integer.valueOf (messages.substring (190, 191)))
.chargingStatus (Integer.valueOf (messages.substring (191, 192)))
.operatingStatus (Integer.valueOf (messages.substring (192, 193)))
.socStatus (Integer.valueOf (messages.substring (193, 194)))
.chargingEnergyStorageStatus (Integer.valueOf (messages.substring (194, 195)))
.driveMotorStatus (Integer.valueOf (messages.substring (195, 196)))
.positionStatus (Integer.valueOf (messages.substring (196, 197)))
.easStatus (Integer.valueOf (messages.substring (197, 198)))
.ptcStatus (Integer.valueOf (messages.substring (198, 199)))
.epsStatus (Integer.valueOf (messages.substring (199, 200)))
.absStatus (Integer.valueOf (messages.substring (200, 201)))
.mcuStatus (Integer.valueOf (messages.substring (201, 202)))
.heatingStatus (Integer.valueOf (messages.substring (202, 203)))
.batteryStatus (Integer.valueOf (messages.substring (203, 204)))
.batteryInsulationStatus (Integer.valueOf (messages.substring (204, 205)))
.build ();
}
}

View File

@ -0,0 +1,139 @@
package com.zhn.decode.service.impl;
import com.zhn.decode.domain.VehicleData;
import lombok.extern.log4j.Log4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.SQLException;
@Service
@Log4j
public class ConsumerService {
// 定义 TiDB 连接信息
private static final String TIDBURL = "jdbc:mysql://123.207.204.152:4000/test";
private static final String USERNAME = "root";
private static final String PASSWORD = "123456";
@KafkaListener (topics = "mqttAA", groupId = "Fluxmq_consumer")
public void listen (ConsumerRecord<String, byte[]> record) {
String message = new String (record.value ());
StringBuilder sb = new StringBuilder ();
String[] arr = message.split (" ");
for (String s : arr) {
int ch = Integer.parseInt (s, 16);
sb.append ((char) ch);
}
VehicleData build = VehicleData.getBuild (sb.toString ());
try (Connection connection = DriverManager.getConnection (TIDBURL, USERNAME, PASSWORD)) {
String sql = """
INSERT INTO vehicle_data (
vin,
create_time,
longitude,
latitude,
speed,
mileage,
voltage,
current,
resistance,
gear,
acceleration_pedal,
brake_pedal,
fuel_consumption_rate,
motor_controller_temperature,
motor_speed,
motor_torque,
motor_temperature,
motor_voltage,
motor_current,
remaining_battery,
maximum_feedback_power,
maximum_discharge_power,
self_check_counter,
total_battery_current,
total_battery_voltage,
single_battery_max_voltage,
single_battery_min_voltage,
single_battery_max_temperature,
single_battery_min_temperature,
available_battery_capacity,
vehicle_status,
charging_status,
operating_status,
soc_status,
charging_energy_storage_status,
drive_motor_status,
position_status,
eas_status,
ptc_status,
eps_status,
abs_status,
mcu_status,
heating_status,
battery_status,
battery_insulation_status\s
)
VALUES
(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)""";
try (PreparedStatement preparedStatement = connection.prepareStatement (sql)) {
// 设置参数
preparedStatement.setString (1, build.getVin ());
preparedStatement.setDate (2, null);
preparedStatement.setString (3, build.getLongitude ());
preparedStatement.setString (4, build.getLatitude ());
preparedStatement.setString (5, build.getSpeed ());
preparedStatement.setBigDecimal (6, build.getMileage ());
preparedStatement.setString (7, build.getVoltage ());
preparedStatement.setString (8, build.getCurrent ());
preparedStatement.setString (9, build.getResistance ());
preparedStatement.setString (10, build.getGear ());
preparedStatement.setString (11, build.getAccelerationPedal ());
preparedStatement.setString (12, build.getBrakePedal ());
preparedStatement.setString (13, build.getFuelConsumptionRate ());
preparedStatement.setString (14, build.getMotorControllerTemperature ());
preparedStatement.setString (15, build.getMotorSpeed ());
preparedStatement.setString (16, build.getMotorTorque ());
preparedStatement.setString (17, build.getMotorTemperature ());
preparedStatement.setString (18, build.getMotorVoltage ());
preparedStatement.setString (19, build.getMotorCurrent ());
preparedStatement.setBigDecimal (20, build.getRemainingBattery ());
preparedStatement.setString (21, build.getMaximumFeedbackPower ());
preparedStatement.setString (22, build.getMaximumDischargePower ());
preparedStatement.setString (23, build.getSelfCheckCounter ());
preparedStatement.setString (24, build.getTotalBatteryCurrent ());
preparedStatement.setString (25, build.getTotalBatteryVoltage ());
preparedStatement.setString (26, build.getSingleBatteryMaxVoltage ());
preparedStatement.setString (27, build.getSingleBatteryMinVoltage ());
preparedStatement.setString (28, build.getSingleBatteryMaxTemperature ());
preparedStatement.setString (29, build.getSingleBatteryMinTemperature ());
preparedStatement.setString (30, build.getAvailableBatteryCapacity ());
preparedStatement.setInt (31, build.getVehicleStatus ());
preparedStatement.setInt (32, build.getChargingStatus ());
preparedStatement.setInt (33, build.getOperatingStatus ());
preparedStatement.setInt (34, build.getSocStatus ());
preparedStatement.setInt (35, build.getChargingEnergyStorageStatus ());
preparedStatement.setInt (36, build.getDriveMotorStatus ());
preparedStatement.setInt (37, build.getPositionStatus ());
preparedStatement.setInt (38, build.getEasStatus ());
preparedStatement.setInt (39, build.getPtcStatus ());
preparedStatement.setInt (40, build.getEpsStatus ());
preparedStatement.setInt (41, build.getAbsStatus ());
preparedStatement.setInt (42, build.getMcuStatus ());
preparedStatement.setInt (43, build.getHeatingStatus ());
preparedStatement.setInt (44, build.getBatteryStatus ());
preparedStatement.setInt (45, build.getBatteryInsulationStatus ());
// 执行插入操作
preparedStatement.executeUpdate ();
}
}
catch (SQLException e) {
System.out.println (e.getMessage ());
}
}
}

View File

@ -0,0 +1,59 @@
package com.zhn.decode.service.impl;
import com.zhn.decode.config.KafkaConfig;
import com.zhn.decode.config.MqttConfig;
import org.eclipse.paho.client.mqttv3.*;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import javax.annotation.PostConstruct;
import java.util.Arrays;
/**
* @author : Administrator
* @description :
*/
@Service
public class ShuntModuleService {
@Autowired
KafkaConfig kafkaClient;
@Autowired
MqttConfig mqttClient;
/** mqtt持续拉 */
@PostConstruct
public void mqttPush () throws MqttException {
MqttClient client = mqttClient.mqttClient ();
client.subscribe ("mqttAA", 1);
client.setCallback (new MqttCallback () {
/** 连接丢失 */
@Override
public void connectionLost (Throwable throwable) {
}
/** 消息已到达 */
@Override
public void messageArrived (String topic, MqttMessage mqttMessage) throws Exception {
/** kafka持续推 */
kafkaClient.kafkaTemplate ().send ("mqttAA", mqttMessage.getPayload ());
System.out.println (Arrays.toString (mqttMessage.getPayload ()));
}
/** 交货完成 */
@Override
public void deliveryComplete (IMqttDeliveryToken iMqttDeliveryToken) {
}
});
}
}

View File

@ -0,0 +1,29 @@
package com.zhn.decode.test;
import com.zhn.decode.domain.VehicleData;
/**
* @author : Administrator
* @Description :
*/
public class KafkaConsumerExample {
public static void main (String[] args) {
String hexString = "4c 4a 44 36 32 46 57 54 45 43 46 4d 37 46 41 54 31 31 37 30 30 39 39 38 31 33 31 36 38 31 31 31 36 2e 37 30 38 36 36 33 30 33 39 2e 35 31 37 33 32 38 30 35 34 2e 30 30 30 31 37 2e 30 30 30 30 30 30 30 30 33 37 38 30 30 30 31 35 30 30 30 31 37 30 39 39 30 30 30 30 44 31 30 37 30 39 2e 31 30 30 36 30 30 30 30 30 37 38 39 37 37 39 35 36 30 38 30 30 30 30 30 32 37 35 30 30 31 30 36 36 30 30 30 30 34 34 35 32 34 2e 32 31 30 30 30 30 38 35 30 30 30 30 30 30 31 30 30 30 30 34 35 32 30 30 30 34 30 30 30 34 30 30 30 39 39 30 30 30 30 32 32 30 30 30 30 36 34 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 9a";
StringBuilder sb = new StringBuilder ();
String[] arr = hexString.split (" ");
int length = arr.length;
for (String s : arr) {
int ch = Integer.parseInt (s, 16);
sb.append ((char) ch);
}
System.out.println (sb.toString ());
VehicleData build = VehicleData.getBuild (sb.toString ());
System.out.println (build.toString ());
}
}

View File

@ -1,25 +0,0 @@
server.port=8080
# 应用名称
spring.application.name=mqtt-decode
# Spring Kafka配置
spring.kafka.bootstrap-servers=123.207.204.152:9092
num.partitions=1 #默认Topic分区数
num.replica.fetchers=1 #默认副本数
# Producer配置
spring.kafka.producer.transaction-id-prefix=kafka_tx.
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer
# Consumer配置
spring.kafka.consumer.enable-auto-commit=false
spring.kafka.listener.ack-mode=manual
spring.kafka.consumer.group-id=${group.id}
spring.kafka.consumer.auto-offset-reset=earliest
# MQTT配置
mqtt.host=tcp://localhost:1883
mqtt.username=my_username
mqtt.password=my_password
mqtt.topic=my_mqtt_topic
# 其他配置
spring.mvc.path-matching.matching-strategy=prefix

View File

@ -0,0 +1,12 @@
server:
port: 8066
spring:
application:
name: "mqtt-decode"
kafka:
bootstrap-servers: "123.207.204.152:9092"
consumer:
group-id: "Fluxmq_consumer"