fix:()kafka读取不到的问题

dev.vehiclegateway
袁子龙 2024-10-06 10:09:48 +08:00
parent c5f0d52aa6
commit a63bde6c84
5 changed files with 89 additions and 26 deletions

View File

@ -19,21 +19,22 @@
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties> </properties>
<dependencies> <dependencies>
<!-- <dependency>-->
<!-- <groupId>org.apache.kafka</groupId>-->
<!-- <artifactId>kafka-clients</artifactId>-->
<!-- <version>3.0.0</version>-->
<!-- </dependency>-->
<dependency> <dependency>
<groupId>org.apache.kafka</groupId> <groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId> <artifactId>kafka-clients</artifactId>
<version>3.0.0</version>
</dependency> </dependency>
<dependency> <dependency>
<groupId>com.muyu</groupId> <groupId>com.muyu</groupId>
<artifactId>cloud-common-core</artifactId> <artifactId>cloud-common-core</artifactId>
</dependency> </dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.8.0</version>
</dependency>
</dependencies> </dependencies>

View File

@ -3,7 +3,6 @@ import cn.hutool.json.JSONObject;
import com.alibaba.fastjson2.JSON; import com.alibaba.fastjson2.JSON;
import com.muyu.common.domain.MessageTemplateType; import com.muyu.common.domain.MessageTemplateType;
import com.muyu.common.domain.SysCar; import com.muyu.common.domain.SysCar;
import com.muyu.common.kafka.config.KafkaProducerConfig;
import com.muyu.common.redis.service.RedisService; import com.muyu.common.redis.service.RedisService;
import com.muyu.server.service.MessageTemplateTypeService; import com.muyu.server.service.MessageTemplateTypeService;
import com.muyu.server.service.SysCarService; import com.muyu.server.service.SysCarService;
@ -14,11 +13,8 @@ import org.eclipse.paho.client.mqttv3.*;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate; import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestParam;
import javax.annotation.PostConstruct; import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import java.util.List; import java.util.List;
/** /**
* *

View File

@ -1,19 +1,17 @@
package com.muyu.server.controller; package com.muyu.server.controller;
import com.alibaba.fastjson2.JSONObject; import com.muyu.cache.SysCarCacheService;
import com.muyu.common.core.domain.Result; import com.muyu.common.core.domain.Result;
import com.muyu.common.domain.SysCar; import com.muyu.common.domain.SysCar;
import com.muyu.common.domain.req.SysCarReq; import com.muyu.common.domain.req.SysCarReq;
import com.muyu.common.domain.resp.SysCarFaultLogVo; import com.muyu.common.domain.resp.SysCarFaultLogVo;
import com.muyu.common.kafka.config.KafkaProducerConfig; import com.muyu.common.domain.resp.SysCarVo;
import com.muyu.common.redis.service.RedisService;
import com.muyu.server.service.SysCarService; import com.muyu.server.service.SysCarService;
import io.swagger.v3.oas.annotations.Operation; import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.tags.Tag; import io.swagger.v3.oas.annotations.tags.Tag;
import lombok.AllArgsConstructor; import lombok.AllArgsConstructor;
import lombok.extern.log4j.Log4j2; import lombok.extern.log4j.Log4j2;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*; import org.springframework.web.bind.annotation.*;
import java.util.List; import java.util.List;
@ -33,12 +31,10 @@ public class SysCarController {
@Autowired @Autowired
private SysCarService sysCarService; private SysCarService sysCarService;
@Autowired
private RabbitTemplate rabbitTemplate;
@Autowired @Autowired
private KafkaProducer kafkaProducer; private SysCarCacheService sysCarCacheService;
/** /**
@ -104,10 +100,10 @@ public class SysCarController {
@PostMapping("/findCarByVin") @PostMapping("/findCarByVin")
@Operation(summary = "根据VIN码查询车信息",description = "根据VIN码查询车信息") @Operation(summary = "根据VIN码查询车信息",description = "根据VIN码查询车信息")
public Result<SysCar> findCarByVin(@RequestParam("carVin") String carVin){ public Result<SysCar> findCarByVin(@RequestParam("carVin") String carVin){
List<SysCarVo> carList = sysCarCacheService.get("carList");
log.info("从redis取出的数据为:"+carList);
return Result.success(sysCarService.findCarByVin(carVin)); return Result.success(sysCarService.findCarByVin(carVin));
} }
} }

View File

@ -11,6 +11,76 @@ nacos:
# SPRING_AMQP_DESERIALIZATION_TRUST_ALL=true spring.amqp.deserialization.trust.all # SPRING_AMQP_DESERIALIZATION_TRUST_ALL=true spring.amqp.deserialization.trust.all
# Spring # Spring
spring: spring:
kafka:
producer:
# Kafka服务器
bootstrap-servers: 150.158.33.234:9092
# 开启事务,必须在开启了事务的方法中发送,否则报错
transaction-id-prefix: kafkaTx-
# 发生错误后消息重发的次数开启事务必须设置大于0。
retries: 3
# acks=0 生产者在成功写入消息之前不会等待任何来自服务器的响应。
# acks=1 只要集群的首领节点收到消息,生产者就会收到一个来自服务器成功响应。
# acks=all :只有当所有参与复制的节点全部收到消息时,生产者才会收到一个来自服务器的成功响应。
acks: all
# 开启事务时必须设置为all
# 当有多个消息需要被发送到同一个分区时,生产者会把它们放在同一个批次里。该参数指定了一个批次可以使用的内存大小,按照字节数计算。
batch-size: 16384
# 生产者内存缓冲区的大小。
buffer-memory: 1024000
# 键的序列化方式
key-serializer: org.apache.kafka.common.serialization.StringSerializer
# 值的序列化方式建议使用Json这种序列化方式可以无需额外配置传输实体类
value-serializer: org.apache.kafka.common.serialization.StringSerializer
consumer:
# Kafka服务器
group-id: my-kafka
# 自动提交的时间间隔 在spring boot 2.X 版本中这里采用的是值的类型为Duration 需要符合特定的格式如1S,1M,2H,5D
#auto-commit-interval: 2s
# 该属性指定了消费者在读取一个没有偏移量的分区或者偏移量无效的情况下该作何处理:
# earliest当各分区下有已提交的offset时从提交的offset开始消费无提交的offset时从头开始消费分区的记录
# latest当各分区下有已提交的offset时从提交的offset开始消费无提交的offset时消费新产生的该分区下的数据在消费者启动之后生成的记录
# none当各分区都存在已提交的offset时从提交的offset开始消费只要有一个分区不存在已提交的offset则抛出异常
auto-offset-reset: latest
# 是否自动提交偏移量默认值是true为了避免出现重复数据和数据丢失可以把它设置为false然后手动提交偏移量
enable-auto-commit: true
# 键的反序列化方式
#key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
key-deserializer: org.apache.kafka.common.serialization.StringSerializer
# 值的反序列化方式建议使用Json这种序列化方式可以无需额外配置传输实体类
value-deserializer: org.apache.kafka.common.serialization.StringSerializer
# 这个参数定义了poll方法最多可以拉取多少条消息默认值为500。如果在拉取消息的时候新消息不足500条那有多少返回多少如果超过500条每次只返回500。
# 这个默认值在有些场景下太大有些场景很难保证能够在5min内处理完500条消息
# 如果消费者无法在5分钟内处理完500条消息的话就会触发reBalance,
# 然后这批消息会被分配到另一个消费者中,还是会处理不完,这样这批消息就永远也处理不完。
# 要避免出现上述问题提前评估好处理一条消息最长需要多少时间然后覆盖默认的max.poll.records参数
# 注需要开启BatchListener批量监听才会生效如果不开启BatchListener则不会出现reBalance情况
max-poll-records: 500
bootstrap-servers: 150.158.33.234:9092
auto-commit-interval: 5000
fetch-max-wait: 500
fetch-min-size: 1
heartbeat-interval: 3000
properties:
# 两次poll之间的最大间隔默认值为5分钟。如果超过这个间隔会触发reBalance
max:
poll:
interval:
ms: 600000
# 当broker多久没有收到consumer的心跳请求后就触发reBalance默认值是10s
session:
timeout:
ms: 10000
listener:
# 在侦听器容器中运行的线程数,一般设置为 机器数*分区数
concurrency: 4
# 自动提交关闭,需要设置手动消息确认
ack-mode: manual_immediate
# 消费监听接口监听的主题不存在时默认会报错所以设置为false忽略错误
missing-topics-fatal: false
# 两次poll之间的最大间隔默认值为5分钟。如果超过这个间隔会触发reBalance
poll-timeout: 600000
mvc: mvc:
pathmatch: pathmatch:
matching-strategy: ant_path_matcher matching-strategy: ant_path_matcher