diff --git a/Dockerfile b/Dockerfile index bc645d9..2ece798 100644 --- a/Dockerfile +++ b/Dockerfile @@ -8,7 +8,7 @@ EXPOSE 9801 VOLUME /home/logs/god-data-server # 复制jar文件到docker内部 -COPY /car-data-server/target/car-data-server.jar /home/app.jar +COPY /target/car-data-data.jar /home/app.jar #工作目录 exec -it 进来默认就是这个目 WORKDIR /home diff --git a/car-data-common/pom.xml b/car-data-common/pom.xml deleted file mode 100644 index c92d9e0..0000000 --- a/car-data-common/pom.xml +++ /dev/null @@ -1,50 +0,0 @@ - - - 4.0.0 - - com.god - god-car-data - 3.6.3 - - - car-data-common - - - 17 - 17 - UTF-8 - - - - - - - - com.god - god-common-datasource - - - - - com.god - god-common-datascope - - - - - com.god - god-common-log - - - - - com.god - god-common-swagger - - - - - - diff --git a/car-data-remote/pom.xml b/car-data-remote/pom.xml deleted file mode 100644 index 0482aec..0000000 --- a/car-data-remote/pom.xml +++ /dev/null @@ -1,28 +0,0 @@ - - - 4.0.0 - - com.god - god-car-data - 3.6.3 - - - car-data-remote - - - 17 - 17 - UTF-8 - - - - - com.god - car-data-common - 3.6.3 - - - - \ No newline at end of file diff --git a/car-data-server/pom.xml b/car-data-server/pom.xml deleted file mode 100644 index c325d20..0000000 --- a/car-data-server/pom.xml +++ /dev/null @@ -1,115 +0,0 @@ - - - 4.0.0 - - com.god - god-car-data - 3.6.3 - - - car-data-server - - - 17 - 17 - UTF-8 - - - - - - org.springframework.kafka - spring-kafka - - - org.apache.kafka - kafka-clients - - - - - org.apache.kafka - kafka-clients - - - - com.god - car-data-common - 3.6.3 - - - - com.alibaba.cloud - spring-cloud-starter-alibaba-nacos-discovery - - - - - com.alibaba.cloud - spring-cloud-starter-alibaba-nacos-config - - - - - com.alibaba.cloud - spring-cloud-starter-alibaba-sentinel - - - - - org.springframework.boot - spring-boot-starter-actuator - - - - - io.springfox - springfox-swagger-ui - ${swagger.fox.version} - - - - - com.mysql - mysql-connector-j - - - junit - junit - - - org.junit.jupiter - junit-jupiter - - - - - - - ${project.artifactId} - - - org.springframework.boot - spring-boot-maven-plugin - - - - repackage - - - - - - - org.apache.maven.plugins - maven-deploy-plugin - - true - - - - - - diff --git a/car-data-server/src/main/java/com/god/data/server/config/KafkaConsumerConfig.java b/car-data-server/src/main/java/com/god/data/server/config/KafkaConsumerConfig.java deleted file mode 100644 index 9012956..0000000 --- a/car-data-server/src/main/java/com/god/data/server/config/KafkaConsumerConfig.java +++ /dev/null @@ -1,111 +0,0 @@ -package com.god.data.server.config; - -import org.apache.kafka.clients.consumer.ConsumerConfig; -import org.springframework.beans.factory.annotation.Value; -import org.springframework.boot.SpringBootConfiguration; -import org.springframework.context.annotation.Bean; -import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; -import org.springframework.kafka.config.KafkaListenerContainerFactory; -import org.springframework.kafka.core.ConsumerFactory; -import org.springframework.kafka.core.DefaultKafkaConsumerFactory; -import org.springframework.kafka.listener.ConcurrentMessageListenerContainer; -import org.springframework.kafka.listener.ContainerProperties; -import org.springframework.kafka.support.serializer.JsonDeserializer; - -import java.util.HashMap; -import java.util.Map; - -/** - * @author 李帆 - * @date 2022/10/31 18:05 - * kafka配置,也可以写在yml,这个文件会覆盖yml - */ -@SpringBootConfiguration -public class KafkaConsumerConfig { - - @Value("${spring.kafka.consumer.bootstrap-servers}") - private String bootstrapServers; - @Value("${spring.kafka.consumer.group-id}") - private String groupId; - @Value("${spring.kafka.consumer.enable-auto-commit}") - private boolean enableAutoCommit; - @Value("${spring.kafka.properties.session.timeout.ms}") - private String sessionTimeout; - @Value("${spring.kafka.properties.max.poll.interval.ms}") - private String maxPollIntervalTime; - @Value("${spring.kafka.consumer.max-poll-records}") - private String maxPollRecords; - @Value("${spring.kafka.consumer.auto-offset-reset}") - private String autoOffsetReset; - @Value("${spring.kafka.listener.concurrency}") - private Integer concurrency; - @Value("${spring.kafka.listener.missing-topics-fatal}") - private boolean missingTopicsFatal; - @Value("${spring.kafka.listener.poll-timeout}") - private long pollTimeout; - - @Bean - public Map consumerConfigs() { - - Map propsMap = new HashMap<>(16); - propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); - propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); - //是否自动提交偏移量,默认值是true,为了避免出现重复数据和数据丢失,可以把它设置为false,然后手动提交偏移量 - propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit); - //自动提交的时间间隔,自动提交开启时生效 - propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "2000"); - //该属性指定了消费者在读取一个没有偏移量的分区或者偏移量无效的情况下该作何处理: - //earliest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费分区的记录 - //latest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据(在消费者启动之后生成的记录) - //none:当各分区都存在已提交的offset时,从提交的offset开始消费;只要有一个分区不存在已提交的offset,则抛出异常 - propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset); - //两次poll之间的最大间隔,默认值为5分钟。如果超过这个间隔会触发reBalance - propsMap.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, maxPollIntervalTime); - //这个参数定义了poll方法最多可以拉取多少条消息,默认值为500。如果在拉取消息的时候新消息不足500条,那有多少返回多少;如果超过500条,每次只返回500。 - //这个默认值在有些场景下太大,有些场景很难保证能够在5min内处理完500条消息, - //如果消费者无法在5分钟内处理完500条消息的话就会触发reBalance, - //然后这批消息会被分配到另一个消费者中,还是会处理不完,这样这批消息就永远也处理不完。 - //要避免出现上述问题,提前评估好处理一条消息最长需要多少时间,然后覆盖默认的max.poll.records参数 - //注:需要开启BatchListener批量监听才会生效,如果不开启BatchListener则不会出现reBalance情况 - propsMap.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords); - //当broker多久没有收到consumer的心跳请求后就触发reBalance,默认值是10s - propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimeout); - //序列化(建议使用Json,这种序列化方式可以无需额外配置传输实体类) - propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class); - propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class); - return propsMap; - } - - @Bean - public ConsumerFactory consumerFactory() { - // 配置消费者的 Json 反序列化的可信赖包,反序列化实体类需要 - try (JsonDeserializer deserializer = new JsonDeserializer<>()) { - deserializer.trustedPackages("*"); - return new DefaultKafkaConsumerFactory<>(consumerConfigs(), new JsonDeserializer<>(), deserializer); - } - } - - /** - * KafkaListenerContainerFactory是Spring Kafka提供的用于创建KafkaListenerContainer的工厂类。 - * KafkaListenerContainer是一个用于消费Kafka消息的容器,它封装了Kafka的消费者API,提供了更加方便的使用方式。 - * KafkaListenerContainerFactory可以配置KafkaListenerContainer的一些属性,如消费者的个数、批量消费的大小、消费者的超时时间等。 - * 在Spring Kafka中,可以通过配置KafkaListenerContainerFactory来创建KafkaListenerContainer,从而实现对Kafka消息的消费 - * @return - */ - @Bean - public KafkaListenerContainerFactory> kafkaListenerContainerFactory() { - ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>(); - factory.setConsumerFactory(consumerFactory()); - //在侦听器容器中运行的线程数,一般设置为 机器数*分区数 - factory.setConcurrency(concurrency); - // 消费监听接口监听的主题不存在时,默认会报错,所以设置为false忽略错误 - factory.setMissingTopicsFatal(missingTopicsFatal); - // 自动提交关闭,需要设置手动消息确认 - factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE); - factory.getContainerProperties().setPollTimeout(pollTimeout); - // 设置为批量监听,需要用List接收 - // factory.setBatchListener(true); - return factory; - } - -} diff --git a/car-data-server/src/main/java/com/god/data/server/config/KafkaProviderConfig.java b/car-data-server/src/main/java/com/god/data/server/config/KafkaProviderConfig.java deleted file mode 100644 index 0b163a9..0000000 --- a/car-data-server/src/main/java/com/god/data/server/config/KafkaProviderConfig.java +++ /dev/null @@ -1,89 +0,0 @@ -package com.god.data.server.config; - -import org.apache.kafka.clients.producer.ProducerConfig; -import org.springframework.beans.factory.annotation.Value; -import org.springframework.boot.SpringBootConfiguration; -import org.springframework.context.annotation.Bean; -import org.springframework.kafka.core.DefaultKafkaProducerFactory; -import org.springframework.kafka.core.KafkaTemplate; -import org.springframework.kafka.core.ProducerFactory; -import org.springframework.kafka.transaction.KafkaTransactionManager; -import org.springframework.kafka.support.serializer.JsonSerializer; - -import java.util.HashMap; -import java.util.Map; - -/** - * kafka 消息的提供者 配置类 - */ -@SpringBootConfiguration -public class KafkaProviderConfig { - - @Value("${spring.kafka.producer.bootstrap-servers}") - private String bootstrapServers; - @Value("${spring.kafka.producer.transaction-id-prefix}") - private String transactionIdPrefix; - @Value("${spring.kafka.producer.acks}") - private String acks; - @Value("${spring.kafka.producer.retries}") - private String retries; - @Value("${spring.kafka.producer.batch-size}") - private String batchSize; - @Value("${spring.kafka.producer.buffer-memory}") - private String bufferMemory; - - /** - * 构建了 Map 存放了 Kafka 生产者的 配置信息 - * @return - */ - @Bean - public Map producerConfigs() { - Map props = new HashMap<>(16); - props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); - //acks=0 : 生产者在成功写入消息之前不会等待任何来自服务器的响应。 - //acks=1 : 只要集群的首领节点收到消息,生产者就会收到一个来自服务器成功响应。 - //acks=all :只有当所有参与复制的节点全部收到消息时,生产者才会收到一个来自服务器的成功响应。 - //开启事务必须设为all - props.put(ProducerConfig.ACKS_CONFIG, acks); - //发生错误后,消息重发的次数,开启事务必须大于0 - props.put(ProducerConfig.RETRIES_CONFIG, retries); - //当多个消息发送到相同分区时,生产者会将消息打包到一起,以减少请求交互. 而不是一条条发送 - //批次的大小可以通过batch.size 参数设置.默认是16KB - //较小的批次大小有可能降低吞吐量(批次大小为0则完全禁用批处理)。 - //比如说,kafka里的消息5秒钟Batch才凑满了16KB,才能发送出去。那这些消息的延迟就是5秒钟 - //实测batchSize这个参数没有用 - props.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSize); - //有的时刻消息比较少,过了很久,比如5min也没有凑够16KB,这样延时就很大,所以需要一个参数. 再设置一个时间,到了这个时间, - //即使数据没达到16KB,也将这个批次发送出去 - props.put(ProducerConfig.LINGER_MS_CONFIG, "5000"); - //生产者内存缓冲区的大小 - props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferMemory); - //反序列化,和生产者的序列化方式对应 - props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, JsonSerializer.class); - props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class); - return props; - } - - /** - * 生产者工厂 - * @return - */ - @Bean - public ProducerFactory producerFactory() { - DefaultKafkaProducerFactory factory = new DefaultKafkaProducerFactory<>(producerConfigs()); - //开启事务,会导致 LINGER_MS_CONFIG 配置失效 - factory.setTransactionIdPrefix(transactionIdPrefix); - return factory; - } - - @Bean - public KafkaTransactionManager kafkaTransactionManager(ProducerFactory producerFactory) { - return new KafkaTransactionManager<>(producerFactory); - } - - @Bean - public KafkaTemplate kafkaTemplate() { - return new KafkaTemplate<>(producerFactory()); - } - -} diff --git a/car-data-server/src/main/java/com/god/data/server/controller/KafkaController.java b/car-data-server/src/main/java/com/god/data/server/controller/KafkaController.java deleted file mode 100644 index 5ae8b1d..0000000 --- a/car-data-server/src/main/java/com/god/data/server/controller/KafkaController.java +++ /dev/null @@ -1,47 +0,0 @@ -package com.god.data.server.controller; - -import com.god.common.core.domain.Result; -import com.god.data.server.config.KafkaSendResultHandler; -import com.god.data.server.test.KafkaMessageTest; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.kafka.core.KafkaTemplate; -import org.springframework.stereotype.Controller; -import org.springframework.transaction.annotation.Transactional; -import org.springframework.web.bind.annotation.GetMapping; -import org.springframework.web.bind.annotation.RequestMapping; - -/** - * @description: 卡芙卡测试 - * @Author fst - * @date 2023/11/21 20:21 - */ -@RequestMapping("kafka") -@Controller -public class KafkaController { - @Autowired - KafkaMessageTest messageTest; - - private KafkaTemplate kafkaTemplate; - - //使用构造器注入 - public KafkaController(KafkaTemplate kafkaTemplate, KafkaSendResultHandler kafkaSendResultHandler){ - this.kafkaTemplate=kafkaTemplate; - this.kafkaTemplate.setProducerListener(kafkaSendResultHandler); - } - - - @GetMapping("test001") - public Result test001(){ - messageTest.providerMessage(); - messageTest.consumerMessage(); - System.out.println("11111"); - return Result.success(); - } - - @GetMapping("test002") - @Transactional - public Result test002(){ - kafkaTemplate.send("test002","能不能测试成功"); - return Result.success(); - } -} diff --git a/car-data-server/src/main/java/com/god/data/server/listeners/KafkaListenerTest.java b/car-data-server/src/main/java/com/god/data/server/listeners/KafkaListenerTest.java deleted file mode 100644 index 26edc08..0000000 --- a/car-data-server/src/main/java/com/god/data/server/listeners/KafkaListenerTest.java +++ /dev/null @@ -1,43 +0,0 @@ -package com.god.data.server.listeners; - -import com.god.common.redis.service.RedisService; -import lombok.extern.log4j.Log4j; -import lombok.extern.log4j.Log4j2; -import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.kafka.annotation.KafkaListener; -import org.springframework.kafka.support.Acknowledgment; -import org.springframework.stereotype.Component; - -/** - * @description: kafka消费监听 - * @Author fst - * @date 2023/11/21 21:24 - */ -@Component -@Log4j2 -public class KafkaListenerTest { - - @Autowired - private RedisService redisService; - - - /** - * @description: kafka监听 - */ - @KafkaListener(topics = { "test002" }, - containerFactory = "kafkaListenerContainerFactory", - errorHandler = "myKafkaListenerErrorHandler") - public void kafkaProducer(ConsumerRecord record, Acknowledgment acknowledgment){ - //获取消息 - String value = (String) record.value(); - log.info("监听到消息,开始消费,消息为:{}",value); - //获取消息的key - String key = (String) record.key(); - System.out.println("======================"); - System.out.println(value); - //手动确认 - acknowledgment.acknowledge(); - } - -} diff --git a/car-data-server/src/main/java/com/god/data/server/test/KafkaMessageTest.java b/car-data-server/src/main/java/com/god/data/server/test/KafkaMessageTest.java deleted file mode 100644 index 02b77fa..0000000 --- a/car-data-server/src/main/java/com/god/data/server/test/KafkaMessageTest.java +++ /dev/null @@ -1,74 +0,0 @@ -package com.god.data.server.test; - -import com.god.data.server.config.KafkaConsumerConfig; -import com.god.data.server.config.KafkaProviderConfig; -import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.apache.kafka.clients.consumer.ConsumerRecords; -import org.apache.kafka.clients.consumer.KafkaConsumer; -import org.apache.kafka.clients.producer.KafkaProducer; -import org.apache.kafka.clients.producer.ProducerRecord; -import org.apache.kafka.common.protocol.types.Field; -import org.junit.jupiter.api.Test; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.context.annotation.Bean; -import org.springframework.stereotype.Component; - -import java.time.Duration; -import java.util.Collections; -import java.util.Map; -import java.util.Properties; - -/** - * @description: kafka消费测试一 - * @Author fst - * @date 2023/11/21 18:42 - */ -@Component -public class KafkaMessageTest { - - @Autowired - KafkaProviderConfig kafkaProviderConfig; - - @Autowired - KafkaConsumerConfig kafkaConsumerConfig; - - - /** - * @description: Kafka生产者 - */ - public void providerMessage(){ - //通过配置工具类获取配置map - Map stringObjectMap = kafkaProviderConfig.producerConfigs(); - Properties properties = new Properties(); - properties.putAll(stringObjectMap); - //通过map来生成kafka生产者对象 - KafkaProducer stringStringKafkaProducer = new KafkaProducer(properties); - //发送消息 封装发送消息对象 - ProducerRecord stringStringProducerRecord = new ProducerRecord<>("test001", "hello_kafka"); - stringStringKafkaProducer.send(stringStringProducerRecord); - //关闭通道 - stringStringKafkaProducer.close(); - } - - /** - * @description: 独立消费者-订阅主题 - */ - public void consumerMessage(){ - Map stringObjectMap = kafkaConsumerConfig.consumerConfigs(); - Properties properties = new Properties(); - properties.putAll(stringObjectMap); - //构建kafka消费者对象 - KafkaConsumer kafkaConsumer = new KafkaConsumer(properties); - //订阅主题 - kafkaConsumer.subscribe(Collections.singleton("test001")); - //循环获取消息 - - ConsumerRecords poll = kafkaConsumer.poll(Duration.ofMillis(1000)); - for (ConsumerRecord consumerRecord : poll) { - System.out.println("===================="); - System.out.println(consumerRecord.key()); - System.out.println(consumerRecord.value()); - } - - } -} diff --git a/pom.xml b/pom.xml index 8925b27..d44976f 100644 --- a/pom.xml +++ b/pom.xml @@ -7,12 +7,6 @@ god-car 3.6.3 - pom - - car-data-common - car-data-remote - car-data-server - 4.0.0 3.6.3 @@ -22,6 +16,12 @@ god-car-data车联网数据解析层 + + 17 + 17 + UTF-8 + + menghang-public @@ -31,6 +31,7 @@ + menghang-releases 梦航-releases @@ -38,5 +39,129 @@ + + + org.springframework.boot + spring-boot-starter-amqp + + + + org.springframework.kafka + spring-kafka + + + org.apache.kafka + kafka-clients + + + + + org.apache.kafka + kafka-clients + + + + com.god + car-data-common + 3.6.3 + + + + com.alibaba.cloud + spring-cloud-starter-alibaba-nacos-discovery + + + + + com.alibaba.cloud + spring-cloud-starter-alibaba-nacos-config + + + + + com.alibaba.cloud + spring-cloud-starter-alibaba-sentinel + + + + + org.springframework.boot + spring-boot-starter-actuator + + + + + io.springfox + springfox-swagger-ui + ${swagger.fox.version} + + + + + com.mysql + mysql-connector-j + + + junit + junit + + + org.junit.jupiter + junit-jupiter + + + + + com.god + god-common-datasource + + + + + com.god + god-common-datascope + + + + + com.god + god-common-log + + + + + com.god + god-common-swagger + + + + + + + + ${project.artifactId} + + + org.springframework.boot + spring-boot-maven-plugin + + + + repackage + + + + + + + org.apache.maven.plugins + maven-deploy-plugin + + true + + + + + diff --git a/car-data-server/src/main/java/com/god/data/server/GodCarDataApplication.java b/src/main/java/com/god/data/GodCarDataApplication.java similarity index 100% rename from car-data-server/src/main/java/com/god/data/server/GodCarDataApplication.java rename to src/main/java/com/god/data/GodCarDataApplication.java diff --git a/src/main/java/com/god/data/common/domain/CarMessage.java b/src/main/java/com/god/data/common/domain/CarMessage.java new file mode 100644 index 0000000..edd2093 --- /dev/null +++ b/src/main/java/com/god/data/common/domain/CarMessage.java @@ -0,0 +1,256 @@ +package com.god.data.common.domain; + +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +import java.math.BigDecimal; + +/** + * @description: 车辆报文接收对象 + * @Author fst + * @date 2023/11/23 14:33 + */ +@Data +@Builder +@NoArgsConstructor +@AllArgsConstructor +public class CarMessage { + /** + * 车辆VIN + */ + private String vin; + + /** + * 时间戳 + */ + private Long timeString; + + /** + * 经度 + */ + private BigDecimal longitude; + + /** + * 纬度 + */ + private BigDecimal latitude; + + /** + * 速度 单位:KM/H + */ + private BigDecimal speed; + + /** + * 总里程 + */ + private BigDecimal sumMileage; + + /** + * 总电压 + */ + private BigDecimal sumVoltage; + + /** + * 总电流 + */ + private BigDecimal sumElectricity; + + /** + * 绝缘电阻 + */ + private BigDecimal sumResistance; + + /** + * 车辆档位 + */ + private String gear; + + /** + * 加速踏板行程值 + */ + private BigDecimal acceleratorPedal; + + /** + *制动踏板行程值 + */ + private BigDecimal brakePedal; + + /** + * Specific fuel consumption 燃料消耗率 + */ + private BigDecimal specificFuelConsumption; + + /** + * 电池控制器温度 + */ + private BigDecimal motorControllerTemperature; + + /** + * 电机转速 + */ + private Long motorSpeed; + + /** + * 电机转矩 + */ + private Long motorTorque; + + /** + * 电机温度 + */ + private BigDecimal motorTemperature; + + /** + * 电机电业 + */ + private BigDecimal motorTage; + + /** + * 电机电流 + */ + private BigDecimal motorCurrent; + + /** + * 动力电池剩余电流SOC + */ + private BigDecimal remainingBattery; + + /** + * 当前状态允许的最大反馈功率 + */ + private BigDecimal maximumFeedbackPower; + + /** + * 当前状态允许最大放电功率 + */ + private BigDecimal maximumDischargePower; + + /** + * BMS自检计数器 + */ + private Integer selfCheckCounter; + + /** + * 动力电池充放电电流 + */ + private BigDecimal totalBatteryCurrent; + + /** + * 动力电池负载端总电压V3 + */ + private BigDecimal totalBatteryVoltage; + + /** + * 单次电池最大电压 + */ + private BigDecimal singleBatteryMaxVoltage; + + /** + * 单体电池最低电压 + */ + private BigDecimal singleBatteryMinVoltage; + + /** + * 单体电池最高温度 + */ + private BigDecimal singleBatteryMaxTemperature; + + /** + * 单体电池最低温度 + */ + private BigDecimal singleBatteryMinTemperature; + + /** + * 动力电池可用容量 + */ + private BigDecimal availableBatteryCapacity; + + /** + * 车辆状态 + */ + private Integer vehicleStatus; + + /** + * 充电状态 + */ + private Integer chargingStatus; + + /** + * 运行状态 + */ + private Integer operatingStatus; + + /** + * SOC + */ + private Integer socStatus; + + /** + * 可充电储能装置工作状态 + */ + private Integer chargingEnergyStorageStatus; + + /** + * 驱动电池状态 + */ + private Integer driveMotorStatus; + + /** + * 定位是否有效 + */ + private Integer positionStatus; + + /** + * EAS状态 + */ + private Integer easStatus; + + /** + * PTC状态 + */ + private Integer ptcStatus; + + /** + * EPS状态 + */ + private Integer epsStatus; + + /** + * ABS状态 + */ + private Integer absStatus; + + /** + * MCU状态 + */ + private Integer mcuStatus; + + /** + * 动力电池加热状态 + */ + private Integer heatingStatus; + + /** + * 动力电池当前状态 + */ + private Integer batteryStatus; + + + /** + * 动力电池保温状态 + */ + private Integer batteryInsulationStatus; + + /** + * DCDC状态 + */ + private Integer dcdcStatus; + + /** + * CHG状态 + */ + private Integer chgStatus; + +} diff --git a/src/main/java/com/god/data/common/domain/LogMessage.java b/src/main/java/com/god/data/common/domain/LogMessage.java new file mode 100644 index 0000000..4640a7a --- /dev/null +++ b/src/main/java/com/god/data/common/domain/LogMessage.java @@ -0,0 +1,25 @@ +package com.god.data.common.domain; + +import lombok.Data; + +import java.io.Serializable; +import java.util.Date; + +/** + * rabbitmq消息内容载体 + * @deprecated 消息内容载体,在rabbitmq中,存储的消息可以是任意的java类型的对象。 + * 强制要求,作为消息数据载体的类型,必须是Serializable的。 + * 如果消息数据载体类型未实现Serializable,在收发消息的时候,都会有异常发生。 + */ +@Data +public class LogMessage implements Serializable { + + private Long id; + private String msg; + private String logLevel; + private String serviceType; + private Date createTime; + private Long userId; + private String exchange; + +} diff --git a/src/main/java/com/god/data/config/kafka/KafkaConsumerConfig.java b/src/main/java/com/god/data/config/kafka/KafkaConsumerConfig.java new file mode 100644 index 0000000..3d85973 --- /dev/null +++ b/src/main/java/com/god/data/config/kafka/KafkaConsumerConfig.java @@ -0,0 +1,89 @@ +package com.god.data.config.kafka; + +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.springframework.boot.context.properties.ConfigurationProperties; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.kafka.support.serializer.JsonDeserializer; + +import java.util.List; +import java.util.Properties; + +import static com.god.data.contents.KafkaContent.TOPIC; + +/** + * @author fst + * @date 2023/11/24 17:30 + * kafka配置,也可以写在yml,这个文件会覆盖yml + */ +@Data +@Builder +@Configuration +@NoArgsConstructor +@AllArgsConstructor +@ConfigurationProperties(prefix = "spring.kafka.consumer") +public class KafkaConsumerConfig { + // kafka消费者服务器 + private String bootstrapServers; + // 分组id + private String groupId; + // 是否自动提交偏移量,默认为true,为了避免出现重复数据和数据丢失,设置为false,手动提交 + private boolean enableAutoCommit; + + private Integer autoCommitInterval; + + private String sessionTimeout; + + private String maxPollIntervalTime; + + private String maxPollRecords; + + private String autoOffsetReset; + + private String keyDeserializer; + + private String valueDeserializer; + + @Bean + public KafkaConsumer consumerConfigs() { + Properties properties = new Properties(); + properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); + properties.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); + //是否自动提交偏移量,默认值是true,为了避免出现重复数据和数据丢失,可以把它设置为false,然后手动提交偏移量 + properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit); + //自动提交的时间间隔,自动提交开启时生效 + properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, autoCommitInterval); + //该属性指定了消费者在读取一个没有偏移量的分区或者偏移量无效的情况下该作何处理: + //earliest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费分区的记录 + //latest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据(在消费者启动之后生成的记录) + //none:当各分区都存在已提交的offset时,从提交的offset开始消费;只要有一个分区不存在已提交的offset,则抛出异常 + properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset); + //两次poll之间的最大间隔,默认值为5分钟。如果超过这个间隔会触发reBalance + properties.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, maxPollIntervalTime); + //这个参数定义了poll方法最多可以拉取多少条消息,默认值为500。如果在拉取消息的时候新消息不足500条,那有多少返回多少;如果超过500条,每次只返回500。 + //这个默认值在有些场景下太大,有些场景很难保证能够在5min内处理完500条消息, + //如果消费者无法在5分钟内处理完500条消息的话就会触发reBalance, + //然后这批消息会被分配到另一个消费者中,还是会处理不完,这样这批消息就永远也处理不完。 + //要避免出现上述问题,提前评估好处理一条消息最长需要多少时间,然后覆盖默认的max.poll.records参数 + //注:需要开启BatchListener批量监听才会生效,如果不开启BatchListener则不会出现reBalance情况 + properties.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords); + //当broker多久没有收到consumer的心跳请求后就触发reBalance,默认值是10s + properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimeout); + //序列化(建议使用Json,这种序列化方式可以无需额外配置传输实体类) + properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class); + properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class); + //设置分区器 + properties.put("partitioner.class", "com.god.kafka.partitioner.MyPartitioner"); + KafkaConsumer consumer = new KafkaConsumer(properties); + consumer.subscribe(List.of(TOPIC)); + return consumer; + } + + + +} diff --git a/car-data-server/src/main/java/com/god/data/server/config/KafkaSendResultHandler.java b/src/main/java/com/god/data/config/kafka/KafkaSendResultHandler.java similarity index 95% rename from car-data-server/src/main/java/com/god/data/server/config/KafkaSendResultHandler.java rename to src/main/java/com/god/data/config/kafka/KafkaSendResultHandler.java index d6ca03a..e79502e 100644 --- a/car-data-server/src/main/java/com/god/data/server/config/KafkaSendResultHandler.java +++ b/src/main/java/com/god/data/config/kafka/KafkaSendResultHandler.java @@ -1,4 +1,4 @@ -package com.god.data.server.config; +package com.god.data.config.kafka; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; diff --git a/car-data-server/src/main/java/com/god/data/server/config/MyKafkaListenerErrorHandler.java b/src/main/java/com/god/data/config/kafka/MyKafkaListenerErrorHandler.java similarity index 97% rename from car-data-server/src/main/java/com/god/data/server/config/MyKafkaListenerErrorHandler.java rename to src/main/java/com/god/data/config/kafka/MyKafkaListenerErrorHandler.java index 5286a3d..8ef5557 100644 --- a/car-data-server/src/main/java/com/god/data/server/config/MyKafkaListenerErrorHandler.java +++ b/src/main/java/com/god/data/config/kafka/MyKafkaListenerErrorHandler.java @@ -1,4 +1,4 @@ -package com.god.data.server.config; +package com.god.data.config.kafka; import org.apache.kafka.clients.consumer.Consumer; diff --git a/src/main/java/com/god/data/config/rabbitmq/ConfirmCallbackConfig.java b/src/main/java/com/god/data/config/rabbitmq/ConfirmCallbackConfig.java new file mode 100644 index 0000000..ce51241 --- /dev/null +++ b/src/main/java/com/god/data/config/rabbitmq/ConfirmCallbackConfig.java @@ -0,0 +1,48 @@ +package com.god.data.config.rabbitmq; + +import lombok.extern.log4j.Log4j2; +import org.springframework.amqp.rabbit.connection.CorrelationData; +import org.springframework.amqp.rabbit.core.RabbitTemplate; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +import javax.annotation.PostConstruct; + +/** + * 消息发送确认配置 + * 消息发送到交换机的回调 + */ +@Component +@Log4j2 +public class ConfirmCallbackConfig implements RabbitTemplate.ConfirmCallback { + + @Autowired + private RabbitTemplate rabbitTemplate; + + /** + * @PostContruct是spring框架的注解,在⽅法上加该注解会在项⽬启动的时候执⾏该⽅法,也可以理解为在spring容器初始化的时候执 + */ + @PostConstruct + public void init() { + rabbitTemplate.setConfirmCallback(this); + } + + /** + * 交换机不管是否收到消息的一个回调方法 + * + * @param correlationData 消息相关数据 + * @param ack 交换机是否收到消息 + * @param cause 失败原因 + */ + @Override + public void confirm(CorrelationData correlationData, boolean ack, String cause) { + if (!ack) { + String exchange = correlationData.getReturned().getExchange(); + String message = correlationData.getReturned().getMessage().getBody().toString(); + // 发送异常 + log.error("消息:{},发送到交换机:{}失败,原因是:{}", message, exchange, cause); + // TODO 可以把异常信息 以及 消息的内容直接添加到 MYSQL + } + } + +} diff --git a/src/main/java/com/god/data/config/rabbitmq/ReturnCallbackConfig.java b/src/main/java/com/god/data/config/rabbitmq/ReturnCallbackConfig.java new file mode 100644 index 0000000..75ca64a --- /dev/null +++ b/src/main/java/com/god/data/config/rabbitmq/ReturnCallbackConfig.java @@ -0,0 +1,41 @@ +package com.god.data.config.rabbitmq; + +import lombok.extern.log4j.Log4j2; +import org.springframework.amqp.core.ReturnedMessage; +import org.springframework.amqp.rabbit.core.RabbitTemplate; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +import javax.annotation.PostConstruct; + +/** + * 消息发送到队列的确认 + */ +@Component +@Log4j2 +public class ReturnCallbackConfig implements RabbitTemplate.ReturnsCallback { + + @Autowired + private RabbitTemplate rabbitTemplate; + + /** + * @PostContruct是spring框架的注解,在⽅法上加该注解会在项⽬启动的时候执⾏该⽅法,也可以理解为在spring容器初始化的时候执 + */ + @PostConstruct + public void init() { + rabbitTemplate.setReturnsCallback(this); + } + + /** + * 消息发送失败 则会执行这个方法 + * + * @param returnedMessage the returned message and metadata. + */ + @Override + public void returnedMessage(ReturnedMessage returnedMessage) { + log.error("消息:{},被交换机:{} 回退!退回原因为:{}", + returnedMessage.getMessage().toString(), returnedMessage.getExchange(), returnedMessage.getReplyText()); + // TODO 回退了所有的信息,可做补偿机制 + } + +} diff --git a/src/main/java/com/god/data/contents/KafkaContent.java b/src/main/java/com/god/data/contents/KafkaContent.java new file mode 100644 index 0000000..90028d5 --- /dev/null +++ b/src/main/java/com/god/data/contents/KafkaContent.java @@ -0,0 +1,13 @@ +package com.god.data.contents; + +/** + * @author DongZl + * @description: kafka常量类 + * @Date 2023/8/25 18:47 + */ +public class KafkaContent { + + public static final String TOPIC = "top"; + + public static final String KAFKA_CON = "39.100.65.135:39092,39.100.65.135:29092,39.100.65.135:19092"; +} diff --git a/src/main/java/com/god/data/contents/RabbitmqContent.java b/src/main/java/com/god/data/contents/RabbitmqContent.java new file mode 100644 index 0000000..b193283 --- /dev/null +++ b/src/main/java/com/god/data/contents/RabbitmqContent.java @@ -0,0 +1,9 @@ +package com.god.data.contents; + +/** + * rabbitmq常量类 + * @Author fst + * @date 2023/11/28 0:07 + */ +public class RabbitmqContent { +} diff --git a/src/main/java/com/god/data/listeners/KafkaListenerTest.java b/src/main/java/com/god/data/listeners/KafkaListenerTest.java new file mode 100644 index 0000000..f4ae7d8 --- /dev/null +++ b/src/main/java/com/god/data/listeners/KafkaListenerTest.java @@ -0,0 +1,42 @@ +//package com.god.data.listeners; +// +//import com.god.common.redis.service.RedisService; +//import lombok.extern.log4j.Log4j2; +//import org.apache.kafka.clients.consumer.ConsumerRecord; +//import org.springframework.beans.factory.annotation.Autowired; +//import org.springframework.kafka.annotation.KafkaListener; +//import org.springframework.kafka.support.Acknowledgment; +//import org.springframework.stereotype.Component; +// +///** +// * @description: kafka消费监听 +// * @Author fst +// * @date 2023/11/21 21:24 +// */ +//@Component +//@Log4j2 +//public class KafkaListenerTest { +// +// @Autowired +// private RedisService redisService; +// +// +// /** +// * @description: kafka监听 +// */ +// @KafkaListener(topics = { "test002" }, +// containerFactory = "kafkaListenerContainerFactory", +// errorHandler = "myKafkaListenerErrorHandler") +// public void kafkaProducer(ConsumerRecord record, Acknowledgment acknowledgment){ +// //获取消息 +// String value = (String) record.value(); +// log.info("监听到消息,开始消费,消息为:{}",value); +// //获取消息的key +// String key = (String) record.key(); +// System.out.println("======================"); +// System.out.println(value); +// //手动确认 +// acknowledgment.acknowledge(); +// } +// +//} diff --git a/src/main/java/com/god/data/partitioner/MyPartitioner.java b/src/main/java/com/god/data/partitioner/MyPartitioner.java new file mode 100644 index 0000000..2a5d317 --- /dev/null +++ b/src/main/java/com/god/data/partitioner/MyPartitioner.java @@ -0,0 +1,43 @@ +package com.god.data.partitioner; + +import org.apache.kafka.clients.producer.Partitioner; +import org.apache.kafka.common.Cluster; +import org.apache.kafka.common.PartitionInfo; +import org.springframework.stereotype.Component; + +import java.util.List; +import java.util.Map; + +/** + * @author:fst + * @date:2023/11/25 + * @aim:自定义分区器 + */ +@Component +public class MyPartitioner implements Partitioner { + /** + * 自定义kafka分区主要解决用户分区数据倾斜问题 提高并发效率(假设 3 分区) + * @param topic 消息队列名 + * @param key 用户传入key + * @param keyBytes key字节数组 + * @param value 用户传入value + * @param valueBytes value字节数组 + * @param cluster 当前kafka节点数 + * @return 如果3个分区,返回 0 1 2 + */ + public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) { + //获取topic的partitions信息 + List partitionInfos = cluster.partitionsForTopic(topic); + int partitionsNum = partitionInfos.size(); + // 这里以 key 的哈希值作为分区选择依据 + System.out.println("================================"); + System.out.println(Math.abs(key.hashCode()) % partitionsNum); + return Math.abs(key.hashCode()) % partitionsNum; + } + + public void close() { + } + + public void configure(Map map) { + } +} diff --git a/src/main/java/com/god/data/rabbitmq/producer/Sender.java b/src/main/java/com/god/data/rabbitmq/producer/Sender.java new file mode 100644 index 0000000..be0411e --- /dev/null +++ b/src/main/java/com/god/data/rabbitmq/producer/Sender.java @@ -0,0 +1,37 @@ +package com.god.data.rabbitmq.producer; + +import com.god.data.common.domain.LogMessage; +import org.springframework.amqp.core.AmqpTemplate; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.stereotype.Component; + +import java.util.UUID; + +/** + * 消息发送者 - Producer。 + * @Component Producer类型的对象,必须交由Spring容器管理。 + * 使用SpringBoot提供的AMQP启动器,来访问rabbitmq的时候,都是通过AmqpTemplate来实现的。 + * 如果全局配置文件中,配置了rabbitmq相关内容,且工程依赖了starter-amqp,则spring容器自动创建AmqpTemplate对象。 + */ +@Component +public class Sender { + + @Autowired + private AmqpTemplate rabbitAmqpTemplate; + + /* + * 发送消息的方法 + */ + public void send(LogMessage msg){ + /* + + convertAndSend - 转换并发送消息的template方法。 + 是将传入的普通java对象,转换为rabbitmq中需要的message类型对象,并发送消息到rabbitmq中。 + 参数一:交换器名称。 类型是String + 参数二:路由键。 类型是String + 参数三:消息,是要发送的消息内容对象。类型是Object + */ + this.rabbitAmqpTemplate.convertAndSend(msg.getExchange(), UUID.randomUUID().toString(), msg); + } +} diff --git a/src/main/java/com/god/data/service/EventService.java b/src/main/java/com/god/data/service/EventService.java new file mode 100644 index 0000000..1c038fa --- /dev/null +++ b/src/main/java/com/god/data/service/EventService.java @@ -0,0 +1,27 @@ +package com.god.data.service; + +import com.god.data.common.domain.CarMessage; + +/** + * @description: 事件 + * @Author fst + * @date 2023/11/27 20:40 + */ +public interface EventService { + + /** + * 添加事件 + */ + public void insert(); + + /** + * 执行事件 + */ + public void execute(CarMessage carMessage); + + /** + * 删除事件 + * @param event + */ + public void remove(String event); +} diff --git a/src/main/java/com/god/data/service/ParseDataService.java b/src/main/java/com/god/data/service/ParseDataService.java new file mode 100644 index 0000000..2cb163f --- /dev/null +++ b/src/main/java/com/god/data/service/ParseDataService.java @@ -0,0 +1,65 @@ +package com.god.data.service; + +import com.god.common.core.utils.SpringUtils; +import com.god.common.redis.service.RedisService; +import com.god.data.common.domain.CarMessage; +import com.god.data.utils.AnalyzeUtils; +import lombok.extern.log4j.Log4j2; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +import javax.annotation.PostConstruct; +import java.time.Duration; +import java.util.List; + +/** + * 获取kafka数据进行解析,并执行事件 + * @author fst + * @Date 2023-11-27 下午 20:02 + */ +@Log4j2 +@Component +public class ParseDataService { + @Autowired + private RedisService redisService; + + private final KafkaConsumer consumer; + + @Autowired + public ParseDataService(KafkaConsumer consumer) { + this.consumer = consumer; + } + + @PostConstruct + public void start(){ + new Thread(() -> { + while (true){ + ConsumerRecords records = null; + try { + //死循环拉取kafka数据 + records = consumer.poll(Duration.ofMillis(100)); + for (ConsumerRecord record : records) { + CarMessage carMessage = AnalyzeUtils.parseVehicleData(record.value()); + //根据对象车辆vin获取事件集合,从redis中获取 + List eventList = redisService.getCacheList("event" + carMessage.getVin()); + //执行事件 + if(eventList!= null && !eventList.isEmpty()){ + for (String event : eventList) { + EventService eventService = SpringUtils.getBean(event); + eventService.execute(carMessage); + } + } + System.out.println(record.offset() +" - "+ record.key() +" - "+ record.value()); + } + }catch (Exception e){ + log.info("records: {}", records); + log.error(e); + } + } + }).start(); + } + +} diff --git a/src/main/java/com/god/data/service/impl/ElectronicFenceEvent.java b/src/main/java/com/god/data/service/impl/ElectronicFenceEvent.java new file mode 100644 index 0000000..881f026 --- /dev/null +++ b/src/main/java/com/god/data/service/impl/ElectronicFenceEvent.java @@ -0,0 +1,28 @@ +package com.god.data.service.impl; + +import com.god.data.common.domain.CarMessage; +import com.god.data.service.EventService; +import org.springframework.stereotype.Service; + +/** + * 电子围栏数据解析事件 + * @Author fst + * @date 2023/11/27 20:43 + */ +@Service(value = "Fence") +public class ElectronicFenceEvent implements EventService { + @Override + public void insert() { + + } + + @Override + public void execute(CarMessage carMessage) { + + } + + @Override + public void remove(String event) { + + } +} diff --git a/src/main/java/com/god/data/service/impl/FaultAlarmEvent.java b/src/main/java/com/god/data/service/impl/FaultAlarmEvent.java new file mode 100644 index 0000000..53c7871 --- /dev/null +++ b/src/main/java/com/god/data/service/impl/FaultAlarmEvent.java @@ -0,0 +1,45 @@ +package com.god.data.service.impl; + +import com.god.common.redis.service.RedisService; +import com.god.data.common.domain.CarMessage; +import com.god.data.service.EventService; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; + +import java.util.ArrayList; + +/** + * 故障报警事件 + * @Author fst + * @date 2023/11/27 21:50 + */ +@Service(value = "FaultAlarm") +public class FaultAlarmEvent implements EventService { + @Autowired + private RedisService redisService; + + + + @Override + public void insert() { + + } + + @Override + public void execute(CarMessage carMessage) { + //创建集合存故障码 + ArrayList strings = new ArrayList<>(); + //判断车辆目前的报错故障,添加对应的故障码 + if (carMessage.getBatteryStatus()==1){ + strings.add("111"); + } + //把对应的故障码存入rabbitmq + + + } + + @Override + public void remove(String event) { + + } +} diff --git a/src/main/java/com/god/data/service/impl/RealTimeTrajectoryEvent.java b/src/main/java/com/god/data/service/impl/RealTimeTrajectoryEvent.java new file mode 100644 index 0000000..d8daffe --- /dev/null +++ b/src/main/java/com/god/data/service/impl/RealTimeTrajectoryEvent.java @@ -0,0 +1,28 @@ +package com.god.data.service.impl; + +import com.god.data.common.domain.CarMessage; +import com.god.data.service.EventService; +import org.springframework.stereotype.Service; + +/** + * 实时轨迹事件 + * @Author fst + * @date 2023/11/27 21:47 + */ +@Service(value = "RealTimeTrajectory") +public class RealTimeTrajectoryEvent implements EventService { + @Override + public void insert() { + + } + + @Override + public void execute(CarMessage carMessage) { + + } + + @Override + public void remove(String event) { + + } +} diff --git a/src/main/java/com/god/data/test/KafkaMessageTest.java b/src/main/java/com/god/data/test/KafkaMessageTest.java new file mode 100644 index 0000000..e8d6629 --- /dev/null +++ b/src/main/java/com/god/data/test/KafkaMessageTest.java @@ -0,0 +1,69 @@ +//package com.god.data.test; +// +//import org.apache.kafka.clients.consumer.ConsumerRecord; +//import org.apache.kafka.clients.consumer.ConsumerRecords; +//import org.apache.kafka.clients.consumer.KafkaConsumer; +//import org.apache.kafka.clients.producer.KafkaProducer; +//import org.apache.kafka.clients.producer.ProducerRecord; +//import org.springframework.beans.factory.annotation.Autowired; +//import org.springframework.stereotype.Component; +// +//import java.time.Duration; +//import java.util.Collections; +//import java.util.Map; +//import java.util.Properties; +// +///** +// * @description: kafka消费测试一 +// * @Author fst +// * @date 2023/11/21 18:42 +// */ +//@Component +//public class KafkaMessageTest { +// +// @Autowired +// KafkaProviderConfig kafkaProviderConfig; +// +// @Autowired +// KafkaConsumerConfig kafkaConsumerConfig; +// +// +// /** +// * @description: Kafka生产者 +// */ +// public void providerMessage(){ +// //通过配置工具类获取配置map +// Map stringObjectMap = kafkaProviderConfig.producerConfigs(); +// Properties properties = new Properties(); +// properties.putAll(stringObjectMap); +// //通过map来生成kafka生产者对象 +// KafkaProducer stringStringKafkaProducer = new KafkaProducer(properties); +// //发送消息 封装发送消息对象 +// ProducerRecord stringStringProducerRecord = new ProducerRecord<>("test001", "hello_kafka"); +// stringStringKafkaProducer.send(stringStringProducerRecord); +// //关闭通道 +// stringStringKafkaProducer.close(); +// } +// +// /** +// * @description: 独立消费者-订阅主题 +// */ +// public void consumerMessage(){ +// Map stringObjectMap = kafkaConsumerConfig.consumerConfigs(); +// Properties properties = new Properties(); +// properties.putAll(stringObjectMap); +// //构建kafka消费者对象 +// KafkaConsumer kafkaConsumer = new KafkaConsumer(properties); +// //订阅主题 +// kafkaConsumer.subscribe(Collections.singleton("test001")); +// //循环获取消息 +// +// ConsumerRecords poll = kafkaConsumer.poll(Duration.ofMillis(1000)); +// for (ConsumerRecord consumerRecord : poll) { +// System.out.println("===================="); +// System.out.println(consumerRecord.key()); +// System.out.println(consumerRecord.value()); +// } +// +// } +//} diff --git a/src/main/java/com/god/data/test/Test007.java b/src/main/java/com/god/data/test/Test007.java new file mode 100644 index 0000000..1a5bc22 --- /dev/null +++ b/src/main/java/com/god/data/test/Test007.java @@ -0,0 +1,34 @@ +package com.god.data.test; + +import com.alibaba.fastjson2.JSONObject; +import com.god.data.common.domain.CarMessage; +import com.god.data.utils.AnalyzeUtils; +import org.junit.jupiter.api.Test; + +/** + * @description: 解析测试 + * @Author fst + * @date 2023/11/23 21:59 + */ +public class Test007 { + + /** + * 测试方法:showStr + */ + @Test + public void showStr(){ + /* + * 字符串变量str存储了待解析的车辆数据 + */ + String str = "7E 56 49 4e 31 32 33 34 35 36 37 38 39 31 32 33 34 35 31 37 30 30 37 34 37 34 37 35 37 39 34 31 31 36 2e 36 37 30 32 33 35 30 33 39 2e 35 33 38 39 39 36 30 31 38 30 2e 30 30 31 33 2e 30 38 30 30 30 30 30 30 36 38 36 30 30 30 34 38 30 30 30 37 35 37 31 30 30 30 30 30 44 37 30 39 30 31 30 2e 30 30 36 32 30 30 30 30 31 34 33 32 30 35 33 39 30 38 31 30 30 30 30 31 36 33 30 30 37 36 35 39 30 30 30 30 34 34 39 35 38 2e 37 38 30 30 30 30 32 30 30 30 30 30 31 33 31 31 30 30 30 32 36 33 30 30 30 33 30 30 30 33 30 30 30 31 30 30 30 30 30 35 33 30 30 30 30 37 39 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 dc 7E"; + /* + * 使用AnalyzeUtils类的parseVehicleData方法解析车辆数据 + */ + CarMessage carMessage = AnalyzeUtils.parseVehicleData(str); + /* + * 打印解析后的车辆信息 + */ + System.out.println(carMessage); + } + +} diff --git a/src/main/java/com/god/data/utils/AnalyzeUtils.java b/src/main/java/com/god/data/utils/AnalyzeUtils.java new file mode 100644 index 0000000..a87b2d4 --- /dev/null +++ b/src/main/java/com/god/data/utils/AnalyzeUtils.java @@ -0,0 +1,90 @@ +package com.god.data.utils; + +import com.god.data.common.domain.CarMessage; + +import java.math.BigDecimal; + +/** + * 报文解析工具类 + * @description: 报文解析工具类 + * @Author fst + * @date 2023/11/23 14:30 + */ +public class AnalyzeUtils { + + /** + * 解析报文获取报文对象 + */ + public static CarMessage parseVehicleData(String hexInput) { + //先去除空格 + String inputString = hexInput.replaceAll(" ", ""); + //调用工具类解析报文 + String input = hexToString(inputString); + CarMessage carMessage = new CarMessage(); + CarMessage.builder() + .vin(input.substring(1, 17)) + .timeString(Long.valueOf(input.substring(18, 30))) + .longitude(BigDecimal.valueOf(Double.parseDouble(input.substring(31, 41)))) + .latitude(BigDecimal.valueOf(Double.parseDouble(input.substring(42, 51)))) + .speed(BigDecimal.valueOf(Double.parseDouble(input.substring(52, 57)))) + .sumMileage(BigDecimal.valueOf(Double.parseDouble(input.substring(58, 68)))) + .sumVoltage(BigDecimal.valueOf(Double.parseDouble(input.substring(69, 74)))) + .sumElectricity(BigDecimal.valueOf(Double.parseDouble(input.substring(75, 79)))) + .sumResistance(BigDecimal.valueOf(Double.parseDouble(input.substring(80, 88)))) + .gear(input.substring(89, 89)) + .acceleratorPedal(BigDecimal.valueOf(Double.parseDouble(input.substring(90, 91)))) + .brakePedal(BigDecimal.valueOf(Double.parseDouble(input.substring(92, 93)))) + .specificFuelConsumption(BigDecimal.valueOf(Double.parseDouble(input.substring(94, 98)))) + .motorControllerTemperature(BigDecimal.valueOf(Double.parseDouble(input.substring(99, 104)))) + .motorSpeed(Long.valueOf(input.substring(105, 109))) + .motorTorque(Long.valueOf(input.substring(110, 113))) + .motorTemperature(BigDecimal.valueOf(Double.parseDouble(input.substring(114, 119)))) + .motorTage(BigDecimal.valueOf(Double.parseDouble(input.substring(120, 124)))) + .motorCurrent(BigDecimal.valueOf(Double.parseDouble(input.substring(125, 132)))) + .remainingBattery(BigDecimal.valueOf(Double.parseDouble(input.substring(133, 138)))) + .maximumFeedbackPower(BigDecimal.valueOf(Double.parseDouble(input.substring(139, 144)))) + .maximumDischargePower(BigDecimal.valueOf(Double.parseDouble(input.substring(145, 150)))) + .selfCheckCounter(Integer.valueOf(input.substring(151, 152))) + .totalBatteryCurrent(BigDecimal.valueOf(Double.parseDouble(input.substring(153, 157)))) + .totalBatteryVoltage(BigDecimal.valueOf(Double.parseDouble(input.substring(158, 163)))) + .singleBatteryMaxVoltage(BigDecimal.valueOf(Double.parseDouble(input.substring(164, 167)))) + .singleBatteryMinVoltage(BigDecimal.valueOf(Double.parseDouble(input.substring(168, 171)))) + .singleBatteryMaxTemperature(BigDecimal.valueOf(Double.parseDouble(input.substring(172, 177)))) + .singleBatteryMinTemperature(BigDecimal.valueOf(Double.parseDouble(input.substring(178, 183)))) + .availableBatteryCapacity(BigDecimal.valueOf(Double.parseDouble(input.substring(184, 189)))) + .vehicleStatus(Integer.valueOf(input.substring(190, 190))) + .chargingStatus(Integer.valueOf(input.substring(191, 191))) + .operatingStatus(Integer.valueOf(input.substring(192, 192))) + .socStatus(Integer.valueOf(input.substring(193, 193))) + .chargingEnergyStorageStatus(Integer.valueOf(input.substring(194, 194))) + .driveMotorStatus(Integer.valueOf(input.substring(195, 195))) + .positionStatus(Integer.valueOf(input.substring(196, 196))) + .easStatus(Integer.valueOf(input.substring(197, 197))) + .ptcStatus(Integer.valueOf(input.substring(198, 198))) + .epsStatus(Integer.valueOf(input.substring(199, 199))) + .absStatus(Integer.valueOf(input.substring(200, 200))) + .mcuStatus(Integer.valueOf(input.substring(201, 201))) + .heatingStatus(Integer.valueOf(input.substring(202, 202))) + .batteryStatus(Integer.valueOf(input.substring(203, 203))) + .batteryInsulationStatus(Integer.valueOf(input.substring(204, 204))) + .dcdcStatus(Integer.valueOf(input.substring(205, 205))) + .chgStatus(Integer.valueOf(input.substring(206, 206))); + //返回解析完的对象 + return carMessage; + } + + + private static String hexToString(String hex) { + StringBuilder output = new StringBuilder(); + for (int i = 0; i < hex.length(); i += 2) { + String str = hex.substring(i, i + 2); + output.append((char) Integer.parseInt(str, 16)); + } + return output.toString(); + } + + + + + +} diff --git a/car-data-server/src/main/resources/banner.txt b/src/main/resources/banner.txt similarity index 100% rename from car-data-server/src/main/resources/banner.txt rename to src/main/resources/banner.txt diff --git a/car-data-server/src/main/resources/bootstrap.yml b/src/main/resources/bootstrap.yml similarity index 72% rename from car-data-server/src/main/resources/bootstrap.yml rename to src/main/resources/bootstrap.yml index c5d53ff..c924f82 100644 --- a/car-data-server/src/main/resources/bootstrap.yml +++ b/src/main/resources/bootstrap.yml @@ -25,25 +25,6 @@ spring: - application-${spring.profiles.active}.${spring.cloud.nacos.config.file-extension} # kafka kafka: - producer: - # Kafka生产者服务器 - bootstrap-servers: 10.100.1.8:9092 - transaction-id-prefix: kafkaTx- - retries: 3 - # acks=0 : 生产者成功写入消息不会等待任何来自服务器的相应 - # acks=1 : 只要集群的master收到消息,生产者就会收到一个来自服务器的响应。 - # acks=all : 只有当所有参与者的复制节点全部收到消息时,生产者才会收到一个来自服务器的响应 - # 开启事务时 必须设置为all - acks: all - # 当有多个消息需要被发送到同一分区时,生产者会把他们放在同一批次里。 - batch-size: 16384 - # 生产者内存缓冲区的大小 - buffer-memory: 1024000 - # 键的序列化方式 - key-serializer: org.springframework.kafka.support.serializer.JsonSerializer - # 值的序列化方式 - value-serializer: org.springframework.kafka.support.serializer.JsonSerializer - consumer: # kafka消费者服务器 bootstrap-servers: 10.100.1.8:9092 @@ -57,11 +38,11 @@ spring: # 是否自动提交偏移量,默认为true,为了避免出现重复数据和数据丢失,设置为false,手动提交 enable-auto-commit: false # 键的序列化方式 - key-deserializer: org.springframework.kafka.support.serializer.JsonSerializer + key-deserializer: org.apache.kafka.common.serialization.StringSerializer # 值的序列化方式 - value-deserializer: org.springframework.kafka.support.serializer.JsonSerializer + value-deserializer: org.apache.kafka.common.serialization.StringSerializer # 自动提交时间间隔 - #auto-commit-interval: 2s + auto-commit-interval: 2000 # 配置消费者的Json反序列化的可信赖包,反序列化实体需要 properties: spring: @@ -75,6 +56,10 @@ spring: # 要避免出现上述问题,提前评估好处理一条消息最长需要多少时间,然后覆盖默认的max.poll.records参数 # 注:需要开启BatchListener批量监听才会生效,如果不开启BatchListener则不会出现reBalance情况 max-poll-records: 3 + max-poll-interval-time: 600000 + # 当broker多久没有收到consumer的心跳请求后就触发reBalance,默认值是10s + session-timeout: 10000 + properties: # 两次poll之间的最大间隔,默认值为5分钟。如果超过这个间隔会触发reBalance max: @@ -92,9 +77,23 @@ spring: ack-mode: manual_immediate # 消费监听接口监听的主题不存在时,默认会报错,所以设置为false忽略报错 missing-topics-fatal: false - # 两次poll之间的最大间隔,默认值为5分钟。如果超过这个间隔会触发reBalance - poll-timeout: 600000 - + #rabbitmq配置 + rabbitmq: + port: 5672 + host: 10.100.1.5 + username: guest + password: guest + #这个配置是保证提供者确保消息推送到交换机中,不管成不成功,都会回调 + publisher-confirm-type: correlated + #保证交换机能把消息推送到队列中 + publisher-returns: true + virtual-host: / + #这个配置是保证消费者会消费消息,手动确认 + listener: + simple: + acknowledge-mode: manual + template: + mandatory: true logging: level: com.god.system.mapper: DEBUG diff --git a/car-data-server/src/main/resources/logback.xml b/src/main/resources/logback.xml similarity index 100% rename from car-data-server/src/main/resources/logback.xml rename to src/main/resources/logback.xml