diff --git a/src/main/java/com/zhn/decode/MqttDecodeMain.java b/src/main/java/com/zhn/decode/ShuntModuleMain.java similarity index 67% rename from src/main/java/com/zhn/decode/MqttDecodeMain.java rename to src/main/java/com/zhn/decode/ShuntModuleMain.java index 9daaf20..2fa64da 100644 --- a/src/main/java/com/zhn/decode/MqttDecodeMain.java +++ b/src/main/java/com/zhn/decode/ShuntModuleMain.java @@ -5,12 +5,13 @@ import org.springframework.boot.autoconfigure.SpringBootApplication; /** * @author : Administrator - * @Description : mqtt解密主函数 + * @Description : 分流模组 */ @SpringBootApplication -public class MqttDecodeMain { +public class ShuntModuleMain { public static void main (String[] args) { - SpringApplication.run (MqttDecodeMain.class, args); + SpringApplication.run (ShuntModuleMain.class, args); } + } diff --git a/src/main/java/com/zhn/decode/config/KafkaConfig.java b/src/main/java/com/zhn/decode/config/KafkaConfig.java index dcf430f..ddee02c 100644 --- a/src/main/java/com/zhn/decode/config/KafkaConfig.java +++ b/src/main/java/com/zhn/decode/config/KafkaConfig.java @@ -8,10 +8,8 @@ import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.annotation.EnableKafka; +import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; import org.springframework.kafka.core.*; -import org.springframework.kafka.listener.ContainerProperties; -import org.springframework.kafka.listener.ContainerProperties.AckMode; -import org.springframework.kafka.listener.KafkaMessageListenerContainer; import org.springframework.kafka.support.serializer.JsonDeserializer; import org.springframework.kafka.support.serializer.JsonSerializer; @@ -22,49 +20,68 @@ import java.util.Map; @EnableKafka public class KafkaConfig { - @Value ("${spring.kafka.bootstrap-servers}") + public static final String KAFKA_ADMIN_CLIENT_ID = "mqtt_to_kafka"; + @Value ("${kafka.bootstrap-servers}") private String bootstrapServers; - - @Value ("${spring.kafka.consumer.group-id}") + @Value ("${kafka.consumer.group-id}") private String groupId; + /** + * KafkaTemplate 配置 + * + * @return KafkaTemplate 对象 + */ @Bean - public KafkaAdmin kafkaAdmin () { - Map configs = new HashMap<> (); - configs.put (ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); - return new KafkaAdmin (configs); + public KafkaTemplate kafkaTemplate () { + // 返回新的 KafkaTemplate 对象 + ProducerFactory prop = producerFactory (); + return new KafkaTemplate<> (prop); } + /** + * 生产者工厂配置 + * + * @return ProducerFactory 对象 + */ @Bean public ProducerFactory producerFactory () { + // 创建 Kafka 生产者配置 Map configs = new HashMap<> (); configs.put (ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); + configs.put (ProducerConfig.CLIENT_ID_CONFIG, KAFKA_ADMIN_CLIENT_ID); configs.put (ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); configs.put (ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class); + // 增加 max.poll.interval.ms 配置 + configs.put (ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 300000); // 设置为 5 分钟(单位为毫秒) + + // 减小 max.poll.records 配置 + configs.put (ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 100); // 设置每次 poll 返回的最大记录数为 100 条 + // 返回新的 DefaultKafkaProducerFactory 对象 return new DefaultKafkaProducerFactory<> (configs); } @Bean - public KafkaTemplate kafkaTemplate (ProducerFactory producerFactory) { - return new KafkaTemplate<> (producerFactory); - } - - @Bean - public KafkaMessageListenerContainer messageListenerContainer () { - ContainerProperties containerProps = new ContainerProperties ("my_topic_name"); - containerProps.setAckMode (AckMode.MANUAL_IMMEDIATE); - containerProps.setMessageListener (new KafkaMessageListener ()); - return new KafkaMessageListenerContainer<> (consumerFactory (), containerProps); + public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory () { + ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<> (); + factory.setConsumerFactory (consumerFactory ()); + return factory; } + /** + * 消费者工厂配置 + * + * @return ConsumerFactory 对象 + */ @Bean public ConsumerFactory consumerFactory () { + // 创建 Kafka 消费者配置 Map props = new HashMap<> (); props.put (ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); props.put (ConsumerConfig.GROUP_ID_CONFIG, groupId); props.put (ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put (ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class); props.put (JsonDeserializer.TRUSTED_PACKAGES, "*"); + // 返回新的 DefaultKafkaConsumerFactory 对象 return new DefaultKafkaConsumerFactory<> (props); } diff --git a/src/main/java/com/zhn/decode/config/KafkaMessageListener.java b/src/main/java/com/zhn/decode/config/KafkaMessageListener.java deleted file mode 100644 index aa451d1..0000000 --- a/src/main/java/com/zhn/decode/config/KafkaMessageListener.java +++ /dev/null @@ -1,29 +0,0 @@ -package com.zhn.decode.config; - -/** - * @author : Administrator - * @description : kafka消息监听实现 - */ - -import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.springframework.kafka.listener.MessageListener; - -public class KafkaMessageListener implements MessageListener { - - /** - * 消费者记录 - * - * @param record - */ - @Override - public void onMessage (ConsumerRecord record) { - // 获取消息的键 - String key = record.key (); - // 获取消息的值 - Object value = record.value (); - - // 在这里处理接收到的消息 - - - } -} diff --git a/src/main/java/com/zhn/decode/config/MqttConfig.java b/src/main/java/com/zhn/decode/config/MqttConfig.java index b587dc9..cf0c207 100644 --- a/src/main/java/com/zhn/decode/config/MqttConfig.java +++ b/src/main/java/com/zhn/decode/config/MqttConfig.java @@ -14,15 +14,6 @@ import org.springframework.context.annotation.Configuration; */ @Configuration public class MqttConfig { - /** - * Kafka代理服务器地址和端口。 - */ - public static final String BOOTSTRAP_SERVERS = "123.207.204.152:9092"; - - /** - * Kafka主题名称。 - */ - public static final String KAFKA_TOPIC = "test"; /** * MQTT代理服务器地址和端口。 @@ -32,12 +23,12 @@ public class MqttConfig { /** * MQTT客户端ID。 */ - private static final String CLIENT_ID = "mqttx_049cd728"; + private static final String CLIENT_ID = "mqttx_12345"; /** * MQTT主题名称。 */ - private static final String TOPIC = "test"; + /** * 创建并配置MQTT客户端。 @@ -48,11 +39,13 @@ public class MqttConfig { */ @Bean public MqttClient mqttClient () throws MqttException { + MqttClient client = new MqttClient (BROKER, CLIENT_ID, new MemoryPersistence ()); MqttConnectOptions options = new MqttConnectOptions (); options.setCleanSession (true); client.connect (options); - client.subscribe (TOPIC, 1); + System.out.println ("开始连接"); + return client; } } diff --git a/src/main/java/com/zhn/decode/domain/VehicleData.java b/src/main/java/com/zhn/decode/domain/VehicleData.java new file mode 100644 index 0000000..5367da2 --- /dev/null +++ b/src/main/java/com/zhn/decode/domain/VehicleData.java @@ -0,0 +1,289 @@ +package com.zhn.decode.domain; + +import lombok.*; + +import java.math.BigDecimal; +import java.util.Date; + +/** + * @author : Administrator + * @description : 车辆数据 + */ +@Data +@AllArgsConstructor +@NoArgsConstructor +@ToString +@Builder +public class VehicleData { + /** + * VIN + */ + private String vin; + + /** + * 时间戳 + */ + private Date createTime; + + /** + * 经度 + */ + private String longitude; + + /** + * 维度 + */ + private String latitude; + + /** + * 速度 + */ + private String speed; + + /** + * 里程 + */ + private BigDecimal mileage; + /** + * 总电压 + */ + private String voltage; + //总电流 + private String current; + //绝缘电阻 + private String resistance; + //挡位 + private String gear; + /** + * 加速踏板行程值 + */ + private String accelerationPedal; + + /** + * 制动踏板行程值 + */ + private String brakePedal; + + /** + * 燃料消耗率 + */ + private String fuelConsumptionRate; + + /** + * 电机控制器温度 + */ + private String motorControllerTemperature; + + /** + * 电机转速 + */ + private String motorSpeed; + + /** + * 电机转矩 + */ + private String motorTorque; + + /** + * 电机温度 + */ + private String motorTemperature; + + /** + * 电机电压 + */ + private String motorVoltage; + + /** + * 电机电流 + */ + private String motorCurrent; + + /** + * 动力电池剩余电量SOC + */ + private BigDecimal remainingBattery; + + + /** + * 当前状态允许的最大反馈功率 + */ + private String maximumFeedbackPower; + + /** + * 当前状态允许最大放电功率 + */ + private String maximumDischargePower; + + /** + * BMS自检计数器 + */ + private String selfCheckCounter; + + /** + * 动力电池充放电电流 + */ + private String totalBatteryCurrent; + + /** + * 动力电池负载端总电压V3 + */ + private String totalBatteryVoltage; + + /** + * 单次最大电压 + */ + private String singleBatteryMaxVoltage; + + /** + * 单体电池最低电压 + */ + private String singleBatteryMinVoltage; + + /** + * 单体电池最高温度 + */ + private String singleBatteryMaxTemperature; + + /** + * 单体电池最低温度 + */ + private String singleBatteryMinTemperature; + + /** + * 动力电池可用容量 + */ + private String availableBatteryCapacity; + + /** + * 车辆状态 + */ + private int vehicleStatus; + + /** + * 充电状态 + */ + private int chargingStatus; + + /** + * 运行状态 + */ + private int operatingStatus; + + /** + * SOC + */ + private int socStatus; + + /** + * 可充电储能装置工作状态 + */ + private int chargingEnergyStorageStatus; + + /** + * 驱动电机状态 + */ + private int driveMotorStatus; + + /** + * 定位是否有效 + */ + private int positionStatus; + + /** + * EAS(汽车防盗系统)状态 + */ + private int easStatus; + + /** + * PTC(电动加热器)状态 + */ + private int ptcStatus; + + /** + * EPS(电动助力系统)状态 + */ + private int epsStatus; + + /** + * ABS(防抱死)状态 + */ + private int absStatus; + + /** + * MCU(电机/逆变器)状态 + */ + private int mcuStatus; + + /** + * 动力电池加热状态 + */ + private int heatingStatus; + + /** + * 动力电池当前状态 + */ + private int batteryStatus; + + /** + * 动力电池保温状态 + */ + private int batteryInsulationStatus; + + + public static VehicleData getBuild (String messages) { + char start = messages.charAt (0); + char end = messages.charAt (messages.length () - 1); + System.out.println (start); + System.out.println (end); + return VehicleData.builder () + .vin (messages.substring (1, 18)) + //messages.substring(18, 31) + .createTime (new Date ()) + .longitude (messages.substring (31, 42)) + .latitude (messages.substring (42, 52)) + .speed (messages.substring (52, 58)) + .mileage (new BigDecimal (messages.substring (58, 69))) + .voltage (messages.substring (69, 75)) + .current (messages.substring (75, 80)) + .resistance (messages.substring (80, 89)) + .gear (messages.substring (89, 90)) + .accelerationPedal (messages.substring (90, 92)) + .brakePedal (messages.substring (92, 94)) + .fuelConsumptionRate (messages.substring (94, 99)) + .motorControllerTemperature (messages.substring (99, 105)) + .motorSpeed (messages.substring (105, 110)) + .motorTorque (messages.substring (110, 114)) + .motorTemperature (messages.substring (114, 120)) + .motorVoltage (messages.substring (120, 125)) + .motorCurrent (messages.substring (125, 133)) + .remainingBattery (new BigDecimal (messages.substring (133, 139))) + .maximumFeedbackPower (messages.substring (139, 145)) + .maximumDischargePower (messages.substring (145, 151)) + .selfCheckCounter (messages.substring (151, 153)) + .totalBatteryCurrent (messages.substring (153, 158)) + .totalBatteryVoltage (messages.substring (158, 164)) + .singleBatteryMaxVoltage (messages.substring (164, 168)) + .singleBatteryMinVoltage (messages.substring (168, 172)) + .singleBatteryMaxTemperature (messages.substring (172, 178)) + .singleBatteryMinTemperature (messages.substring (178, 184)) + .availableBatteryCapacity (messages.substring (184, 190)) + .vehicleStatus (Integer.valueOf (messages.substring (190, 191))) + .chargingStatus (Integer.valueOf (messages.substring (191, 192))) + .operatingStatus (Integer.valueOf (messages.substring (192, 193))) + .socStatus (Integer.valueOf (messages.substring (193, 194))) + .chargingEnergyStorageStatus (Integer.valueOf (messages.substring (194, 195))) + .driveMotorStatus (Integer.valueOf (messages.substring (195, 196))) + .positionStatus (Integer.valueOf (messages.substring (196, 197))) + .easStatus (Integer.valueOf (messages.substring (197, 198))) + .ptcStatus (Integer.valueOf (messages.substring (198, 199))) + .epsStatus (Integer.valueOf (messages.substring (199, 200))) + .absStatus (Integer.valueOf (messages.substring (200, 201))) + .mcuStatus (Integer.valueOf (messages.substring (201, 202))) + .heatingStatus (Integer.valueOf (messages.substring (202, 203))) + .batteryStatus (Integer.valueOf (messages.substring (203, 204))) + .batteryInsulationStatus (Integer.valueOf (messages.substring (204, 205))) + .build (); + + } +} diff --git a/src/main/java/com/zhn/decode/service/impl/ConsumerService.java b/src/main/java/com/zhn/decode/service/impl/ConsumerService.java new file mode 100644 index 0000000..63f4385 --- /dev/null +++ b/src/main/java/com/zhn/decode/service/impl/ConsumerService.java @@ -0,0 +1,139 @@ +package com.zhn.decode.service.impl; + +import com.zhn.decode.domain.VehicleData; +import lombok.extern.log4j.Log4j; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.springframework.kafka.annotation.KafkaListener; +import org.springframework.stereotype.Service; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.PreparedStatement; +import java.sql.SQLException; + +@Service +@Log4j +public class ConsumerService { + // 定义 TiDB 连接信息 + private static final String TIDBURL = "jdbc:mysql://123.207.204.152:4000/test"; + private static final String USERNAME = "root"; + private static final String PASSWORD = "123456"; + + @KafkaListener (topics = "mqttAA", groupId = "Fluxmq_consumer") + public void listen (ConsumerRecord record) { + String message = new String (record.value ()); + StringBuilder sb = new StringBuilder (); + String[] arr = message.split (" "); + for (String s : arr) { + int ch = Integer.parseInt (s, 16); + sb.append ((char) ch); + } + VehicleData build = VehicleData.getBuild (sb.toString ()); + try (Connection connection = DriverManager.getConnection (TIDBURL, USERNAME, PASSWORD)) { + String sql = """ + INSERT INTO vehicle_data ( + vin, + create_time, + longitude, + latitude, + speed, + mileage, + voltage, + current, + resistance, + gear, + acceleration_pedal, + brake_pedal, + fuel_consumption_rate, + motor_controller_temperature, + motor_speed, + motor_torque, + motor_temperature, + motor_voltage, + motor_current, + remaining_battery, + maximum_feedback_power, + maximum_discharge_power, + self_check_counter, + total_battery_current, + total_battery_voltage, + single_battery_max_voltage, + single_battery_min_voltage, + single_battery_max_temperature, + single_battery_min_temperature, + available_battery_capacity, + vehicle_status, + charging_status, + operating_status, + soc_status, + charging_energy_storage_status, + drive_motor_status, + position_status, + eas_status, + ptc_status, + eps_status, + abs_status, + mcu_status, + heating_status, + battery_status, + battery_insulation_status\s + ) + VALUES + (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)"""; + try (PreparedStatement preparedStatement = connection.prepareStatement (sql)) { + // 设置参数 + preparedStatement.setString (1, build.getVin ()); + preparedStatement.setDate (2, null); + preparedStatement.setString (3, build.getLongitude ()); + preparedStatement.setString (4, build.getLatitude ()); + preparedStatement.setString (5, build.getSpeed ()); + preparedStatement.setBigDecimal (6, build.getMileage ()); + preparedStatement.setString (7, build.getVoltage ()); + preparedStatement.setString (8, build.getCurrent ()); + preparedStatement.setString (9, build.getResistance ()); + preparedStatement.setString (10, build.getGear ()); + preparedStatement.setString (11, build.getAccelerationPedal ()); + preparedStatement.setString (12, build.getBrakePedal ()); + preparedStatement.setString (13, build.getFuelConsumptionRate ()); + preparedStatement.setString (14, build.getMotorControllerTemperature ()); + preparedStatement.setString (15, build.getMotorSpeed ()); + preparedStatement.setString (16, build.getMotorTorque ()); + preparedStatement.setString (17, build.getMotorTemperature ()); + preparedStatement.setString (18, build.getMotorVoltage ()); + preparedStatement.setString (19, build.getMotorCurrent ()); + preparedStatement.setBigDecimal (20, build.getRemainingBattery ()); + preparedStatement.setString (21, build.getMaximumFeedbackPower ()); + preparedStatement.setString (22, build.getMaximumDischargePower ()); + preparedStatement.setString (23, build.getSelfCheckCounter ()); + preparedStatement.setString (24, build.getTotalBatteryCurrent ()); + preparedStatement.setString (25, build.getTotalBatteryVoltage ()); + preparedStatement.setString (26, build.getSingleBatteryMaxVoltage ()); + preparedStatement.setString (27, build.getSingleBatteryMinVoltage ()); + preparedStatement.setString (28, build.getSingleBatteryMaxTemperature ()); + preparedStatement.setString (29, build.getSingleBatteryMinTemperature ()); + preparedStatement.setString (30, build.getAvailableBatteryCapacity ()); + preparedStatement.setInt (31, build.getVehicleStatus ()); + preparedStatement.setInt (32, build.getChargingStatus ()); + preparedStatement.setInt (33, build.getOperatingStatus ()); + preparedStatement.setInt (34, build.getSocStatus ()); + preparedStatement.setInt (35, build.getChargingEnergyStorageStatus ()); + preparedStatement.setInt (36, build.getDriveMotorStatus ()); + preparedStatement.setInt (37, build.getPositionStatus ()); + preparedStatement.setInt (38, build.getEasStatus ()); + preparedStatement.setInt (39, build.getPtcStatus ()); + preparedStatement.setInt (40, build.getEpsStatus ()); + preparedStatement.setInt (41, build.getAbsStatus ()); + preparedStatement.setInt (42, build.getMcuStatus ()); + preparedStatement.setInt (43, build.getHeatingStatus ()); + preparedStatement.setInt (44, build.getBatteryStatus ()); + preparedStatement.setInt (45, build.getBatteryInsulationStatus ()); + + // 执行插入操作 + preparedStatement.executeUpdate (); + } + } + catch (SQLException e) { + System.out.println (e.getMessage ()); + } + } +} diff --git a/src/main/java/com/zhn/decode/service/impl/ShuntModuleService.java b/src/main/java/com/zhn/decode/service/impl/ShuntModuleService.java new file mode 100644 index 0000000..b88d406 --- /dev/null +++ b/src/main/java/com/zhn/decode/service/impl/ShuntModuleService.java @@ -0,0 +1,59 @@ +package com.zhn.decode.service.impl; + +import com.zhn.decode.config.KafkaConfig; +import com.zhn.decode.config.MqttConfig; +import org.eclipse.paho.client.mqttv3.*; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; + +import javax.annotation.PostConstruct; +import java.util.Arrays; + +/** + * @author : Administrator + * @description : 分流平台服务 + */ +@Service +public class ShuntModuleService { + + @Autowired + KafkaConfig kafkaClient; + @Autowired + MqttConfig mqttClient; + + /** mqtt持续拉 */ + @PostConstruct + public void mqttPush () throws MqttException { + MqttClient client = mqttClient.mqttClient (); + client.subscribe ("mqttAA", 1); + client.setCallback (new MqttCallback () { + /** 连接丢失 */ + @Override + public void connectionLost (Throwable throwable) { + + } + + /** 消息已到达 */ + + @Override + public void messageArrived (String topic, MqttMessage mqttMessage) throws Exception { + /** kafka持续推 */ + + kafkaClient.kafkaTemplate ().send ("mqttAA", mqttMessage.getPayload ()); + System.out.println (Arrays.toString (mqttMessage.getPayload ())); + + } + + /** 交货完成 */ + + @Override + public void deliveryComplete (IMqttDeliveryToken iMqttDeliveryToken) { + + } + }); + + + } + + +} diff --git a/src/main/java/com/zhn/decode/test/KafkaConsumerExample.java b/src/main/java/com/zhn/decode/test/KafkaConsumerExample.java new file mode 100644 index 0000000..20887d5 --- /dev/null +++ b/src/main/java/com/zhn/decode/test/KafkaConsumerExample.java @@ -0,0 +1,29 @@ +package com.zhn.decode.test; + + +import com.zhn.decode.domain.VehicleData; + +/** + * @author : Administrator + * @Description : 消费者测试 + */ +public class KafkaConsumerExample { + public static void main (String[] args) { + String hexString = "4c 4a 44 36 32 46 57 54 45 43 46 4d 37 46 41 54 31 31 37 30 30 39 39 38 31 33 31 36 38 31 31 31 36 2e 37 30 38 36 36 33 30 33 39 2e 35 31 37 33 32 38 30 35 34 2e 30 30 30 31 37 2e 30 30 30 30 30 30 30 30 33 37 38 30 30 30 31 35 30 30 30 31 37 30 39 39 30 30 30 30 44 31 30 37 30 39 2e 31 30 30 36 30 30 30 30 30 37 38 39 37 37 39 35 36 30 38 30 30 30 30 30 32 37 35 30 30 31 30 36 36 30 30 30 30 34 34 35 32 34 2e 32 31 30 30 30 30 38 35 30 30 30 30 30 30 31 30 30 30 30 34 35 32 30 30 30 34 30 30 30 34 30 30 30 39 39 30 30 30 30 32 32 30 30 30 30 36 34 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 9a"; + StringBuilder sb = new StringBuilder (); + String[] arr = hexString.split (" "); + int length = arr.length; + for (String s : arr) { + int ch = Integer.parseInt (s, 16); + sb.append ((char) ch); + } + System.out.println (sb.toString ()); + + + VehicleData build = VehicleData.getBuild (sb.toString ()); + + + System.out.println (build.toString ()); + + } +} diff --git a/src/main/resources/application.properties b/src/main/resources/application.properties deleted file mode 100644 index a89bb1a..0000000 --- a/src/main/resources/application.properties +++ /dev/null @@ -1,25 +0,0 @@ -server.port=8080 -# 应用名称 -spring.application.name=mqtt-decode -# Spring Kafka配置 -spring.kafka.bootstrap-servers=123.207.204.152:9092 -num.partitions=1 #默认Topic分区数 -num.replica.fetchers=1 #默认副本数 -# Producer配置 -spring.kafka.producer.transaction-id-prefix=kafka_tx. -spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer -spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer -# Consumer配置 -spring.kafka.consumer.enable-auto-commit=false -spring.kafka.listener.ack-mode=manual -spring.kafka.consumer.group-id=${group.id} -spring.kafka.consumer.auto-offset-reset=earliest -# MQTT配置 -mqtt.host=tcp://localhost:1883 -mqtt.username=my_username -mqtt.password=my_password -mqtt.topic=my_mqtt_topic -# 其他配置 -spring.mvc.path-matching.matching-strategy=prefix - - diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml new file mode 100644 index 0000000..c6071e6 --- /dev/null +++ b/src/main/resources/application.yml @@ -0,0 +1,12 @@ +server: + port: 8066 + +spring: + application: + name: "mqtt-decode" +kafka: + bootstrap-servers: "123.207.204.152:9092" + consumer: + group-id: "Fluxmq_consumer" + +