feat: 新增根据vin获取kafka中的数据

master
yaoxin 2024-06-17 22:33:33 +08:00
parent a601df32fa
commit 9459bb2576
5 changed files with 347 additions and 0 deletions

20
pom.xml
View File

@ -21,6 +21,26 @@
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<!-- Kafka -->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.3.0</version>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
<scope>test</scope>
</dependency>
<!-- SpringBoot Boot Redis -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<!-- Mysql Connector -->
<dependency>

View File

@ -0,0 +1,53 @@
package com.muyu.eventdriven.consumer;
import com.alibaba.fastjson.JSON;
import com.muyu.eventdriven.domain.VehicleKafka;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
/**
* @ClassName KafkaConsumers
* @Description
* @Author Xin.Yao
* @Date 2024/6/9 9:54
*/
@Component
public class KafkaConsumers {
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
@Autowired
private RedisTemplate<String,String> redisTemplate;
public KafkaConsumer kafkaConsumer(String vin){
Object o = redisTemplate.opsForHash().get("vehicleKafka", vin);
VehicleKafka vehicleKafka = JSON.parseObject(o.toString(), VehicleKafka.class);
Properties properties = new Properties();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
// 指定分区策略
properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, "org.apache.kafka.clients.consumer.RoundRobinAssignor");
// 指定消费者组,必须参数
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test1");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
// 订阅主题分区
List<TopicPartition> topicPartitions = new ArrayList<>();
topicPartitions.add(new TopicPartition(vehicleKafka.getConsumerName(), vehicleKafka.getPartitions()));
consumer.assign(topicPartitions);
return consumer;
}
}

View File

@ -0,0 +1,247 @@
package com.muyu.eventdriven.domain;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.experimental.SuperBuilder;
import java.math.BigDecimal;
/**
* @ClassName VehicleData
* @Description
* @Author Xin.Yao
* @Date 2024/6/5 6:52
*/
@Data
@SuperBuilder
@AllArgsConstructor
@NoArgsConstructor
public class VehicleData {
/**
* VIN
*/
private String vin;
/**
* 线
*/
private String drivingRoute;
/**
*
*/
private String longitude;
/**
*
*/
private String latitude;
/**
*
*/
private String speed;
/**
*
*/
private BigDecimal mileage;
/**
*
*/
private String voltage;
/**
*
*/
private String current;
/**
*
*/
private String resistance;
/**
*
*/
private String gear = "P";
/**
*
*/
private String accelerationPedal;
/**
*
*/
private String brakePedal;
/**
*
*/
private String fuelConsumptionRate;
/**
*
*/
private String motorControllerTemperature;
/**
*
*/
private String motorSpeed;
/**
*
*/
private String motorTorque;
/**
*
*/
private String motorTemperature;
/**
*
*/
private String motorVoltage;
/**
*
*/
private String motorCurrent;
/**
* SOC
*/
private BigDecimal remainingBattery;
/**
*
*/
private BigDecimal batteryLevel;
/**
*
*/
private String maximumFeedbackPower;
/**
*
*/
private String maximumDischargePower;
/**
* BMS
*/
private String selfCheckCounter;
/**
*
*/
private String totalBatteryCurrent;
/**
* V3
*/
private String totalBatteryVoltage;
/**
*
*/
private String singleBatteryMaxVoltage;
/**
*
*/
private String singleBatteryMinVoltage;
/**
*
*/
private String singleBatteryMaxTemperature;
/**
*
*/
private String singleBatteryMinTemperature;
/**
*
*/
private String availableBatteryCapacity;
/**
*
*/
private int vehicleStatus = 1;
/**
*
*/
private int chargingStatus = 1;
/**
*
*/
private int operatingStatus = 1;
/**
* SOC
*/
private int socStatus = 1;
/**
*
*/
private int chargingEnergyStorageStatus = 1;
/**
*
*/
private int driveMotorStatus = 1;
/**
*
*/
private int positionStatus = 1;
/**
* EAS()
*/
private int easStatus = 1;
/**
* PTC()
*/
private int ptcStatus = 1;
/**
* EPS()
*/
private int epsStatus = 1;
/**
* ABS()
*/
private int absStatus = 1;
/**
* MCU(/)
*/
private int mcuStatus = 1;
/**
*
*/
private int heatingStatus = 1;
/**
*
*/
private int batteryStatus = 1;
/**
*
*/
private int batteryInsulationStatus = 1;
/**
* DCDC()
*/
private int dcdcStatus = 1;
/**
* CHG()
*/
private int chgStatus = 1;
}

View File

@ -0,0 +1,16 @@
package com.muyu.eventdriven.domain;
import lombok.Data;
/**
* @ClassName Test
* @Description
* @Author Xin.Yao
* @Date 2024/6/9 10:56
*/
@Data
public class VehicleKafka {
private Integer partitions;
private String key;
private String consumerName;
}

View File

@ -2,6 +2,17 @@ server:
port: 9006
spring:
kafka:
#config/consumer.properties配置的bootstrap.servers
bootstrap-servers: 47.98.170.220:9092
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
consumer:
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
#这个可以和config/consumer.properties里的group.id不同
group-id: test-consumer-group
redis:
host: 127.0.0.1
port: 6379