From a63bde6c84d18ce991b510ed7dd1039de16e3a35 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?=E8=A2=81=E5=AD=90=E9=BE=99?=
<14096380+qwe963852@user.noreply.gitee.com>
Date: Sun, 6 Oct 2024 10:09:48 +0800
Subject: [PATCH] =?UTF-8?q?fix:()kafka=E8=AF=BB=E5=8F=96=E4=B8=8D=E5=88=B0?=
=?UTF-8?q?=E7=9A=84=E9=97=AE=E9=A2=98?=
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
---
cloud-common/cloud-common-kafka/pom.xml | 13 ++--
.../kafka/config/KafkaConsumerConfig.java | 10 +--
.../muyu/template/config/MqttConfigure.java | 4 --
.../server/controller/SysCarController.java | 18 ++---
.../src/main/resources/bootstrap.yml | 70 +++++++++++++++++++
5 files changed, 89 insertions(+), 26 deletions(-)
diff --git a/cloud-common/cloud-common-kafka/pom.xml b/cloud-common/cloud-common-kafka/pom.xml
index 7f5126e..61a34c1 100644
--- a/cloud-common/cloud-common-kafka/pom.xml
+++ b/cloud-common/cloud-common-kafka/pom.xml
@@ -19,21 +19,22 @@
UTF-8
+
+
+
+
+
+
org.apache.kafka
kafka-clients
- 3.0.0
com.muyu
cloud-common-core
-
- org.apache.kafka
- kafka-clients
- 2.8.0
-
+
diff --git a/cloud-common/cloud-common-kafka/src/main/java/com/muyu/common/kafka/config/KafkaConsumerConfig.java b/cloud-common/cloud-common-kafka/src/main/java/com/muyu/common/kafka/config/KafkaConsumerConfig.java
index 3eba063..212d27c 100644
--- a/cloud-common/cloud-common-kafka/src/main/java/com/muyu/common/kafka/config/KafkaConsumerConfig.java
+++ b/cloud-common/cloud-common-kafka/src/main/java/com/muyu/common/kafka/config/KafkaConsumerConfig.java
@@ -13,11 +13,11 @@ import java.util.HashMap;
import java.util.Map;
/**
-* kafka消费者配置类
-* @program: cloud-server
-* @author: cuiyongxing
-* @create: 2024-09-28 14:28
-**/
+ * kafka消费者配置类
+ * @program: cloud-server
+ * @author: cuiyongxing
+ * @create: 2024-09-28 14:28
+ **/
@Configuration
public class KafkaConsumerConfig {
diff --git a/cloud-modules/cloud-modules-template/src/main/java/com/muyu/template/config/MqttConfigure.java b/cloud-modules/cloud-modules-template/src/main/java/com/muyu/template/config/MqttConfigure.java
index 7c77cad..a3a6474 100644
--- a/cloud-modules/cloud-modules-template/src/main/java/com/muyu/template/config/MqttConfigure.java
+++ b/cloud-modules/cloud-modules-template/src/main/java/com/muyu/template/config/MqttConfigure.java
@@ -3,7 +3,6 @@ import cn.hutool.json.JSONObject;
import com.alibaba.fastjson2.JSON;
import com.muyu.common.domain.MessageTemplateType;
import com.muyu.common.domain.SysCar;
-import com.muyu.common.kafka.config.KafkaProducerConfig;
import com.muyu.common.redis.service.RedisService;
import com.muyu.server.service.MessageTemplateTypeService;
import com.muyu.server.service.SysCarService;
@@ -14,11 +13,8 @@ import org.eclipse.paho.client.mqttv3.*;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
-import org.springframework.web.bind.annotation.PostMapping;
-import org.springframework.web.bind.annotation.RequestParam;
import javax.annotation.PostConstruct;
-import javax.annotation.Resource;
import java.util.List;
/**
*
diff --git a/cloud-modules/saas/saas-server/src/main/java/com/muyu/server/controller/SysCarController.java b/cloud-modules/saas/saas-server/src/main/java/com/muyu/server/controller/SysCarController.java
index 9fa4ddf..7246208 100644
--- a/cloud-modules/saas/saas-server/src/main/java/com/muyu/server/controller/SysCarController.java
+++ b/cloud-modules/saas/saas-server/src/main/java/com/muyu/server/controller/SysCarController.java
@@ -1,19 +1,17 @@
package com.muyu.server.controller;
-import com.alibaba.fastjson2.JSONObject;
+import com.muyu.cache.SysCarCacheService;
import com.muyu.common.core.domain.Result;
import com.muyu.common.domain.SysCar;
import com.muyu.common.domain.req.SysCarReq;
import com.muyu.common.domain.resp.SysCarFaultLogVo;
-import com.muyu.common.kafka.config.KafkaProducerConfig;
-import com.muyu.common.redis.service.RedisService;
+import com.muyu.common.domain.resp.SysCarVo;
+
import com.muyu.server.service.SysCarService;
import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.tags.Tag;
import lombok.AllArgsConstructor;
import lombok.extern.log4j.Log4j2;
-import org.apache.kafka.clients.producer.KafkaProducer;
-import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;
import java.util.List;
@@ -33,12 +31,10 @@ public class SysCarController {
@Autowired
private SysCarService sysCarService;
- @Autowired
- private RabbitTemplate rabbitTemplate;
-
@Autowired
- private KafkaProducer kafkaProducer;
+ private SysCarCacheService sysCarCacheService;
+
/**
@@ -104,10 +100,10 @@ public class SysCarController {
@PostMapping("/findCarByVin")
@Operation(summary = "根据VIN码查询车信息",description = "根据VIN码查询车信息")
public Result findCarByVin(@RequestParam("carVin") String carVin){
-
+ List carList = sysCarCacheService.get("carList");
+ log.info("从redis取出的数据为:"+carList);
return Result.success(sysCarService.findCarByVin(carVin));
}
-
}
diff --git a/cloud-modules/saas/saas-server/src/main/resources/bootstrap.yml b/cloud-modules/saas/saas-server/src/main/resources/bootstrap.yml
index faf0576..25cea25 100644
--- a/cloud-modules/saas/saas-server/src/main/resources/bootstrap.yml
+++ b/cloud-modules/saas/saas-server/src/main/resources/bootstrap.yml
@@ -11,6 +11,76 @@ nacos:
# SPRING_AMQP_DESERIALIZATION_TRUST_ALL=true spring.amqp.deserialization.trust.all
# Spring
spring:
+ kafka:
+ producer:
+ # Kafka服务器
+ bootstrap-servers: 150.158.33.234:9092
+ # 开启事务,必须在开启了事务的方法中发送,否则报错
+ transaction-id-prefix: kafkaTx-
+ # 发生错误后,消息重发的次数,开启事务必须设置大于0。
+ retries: 3
+ # acks=0 : 生产者在成功写入消息之前不会等待任何来自服务器的响应。
+ # acks=1 : 只要集群的首领节点收到消息,生产者就会收到一个来自服务器成功响应。
+ # acks=all :只有当所有参与复制的节点全部收到消息时,生产者才会收到一个来自服务器的成功响应。
+ acks: all
+ # 开启事务时,必须设置为all
+ # 当有多个消息需要被发送到同一个分区时,生产者会把它们放在同一个批次里。该参数指定了一个批次可以使用的内存大小,按照字节数计算。
+ batch-size: 16384
+ # 生产者内存缓冲区的大小。
+ buffer-memory: 1024000
+ # 键的序列化方式
+ key-serializer: org.apache.kafka.common.serialization.StringSerializer
+ # 值的序列化方式(建议使用Json,这种序列化方式可以无需额外配置传输实体类)
+ value-serializer: org.apache.kafka.common.serialization.StringSerializer
+ consumer:
+ # Kafka服务器
+ group-id: my-kafka
+ # 自动提交的时间间隔 在spring boot 2.X 版本中这里采用的是值的类型为Duration 需要符合特定的格式,如1S,1M,2H,5D
+ #auto-commit-interval: 2s
+ # 该属性指定了消费者在读取一个没有偏移量的分区或者偏移量无效的情况下该作何处理:
+ # earliest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费分区的记录
+ # latest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据(在消费者启动之后生成的记录)
+ # none:当各分区都存在已提交的offset时,从提交的offset开始消费;只要有一个分区不存在已提交的offset,则抛出异常
+ auto-offset-reset: latest
+ # 是否自动提交偏移量,默认值是true,为了避免出现重复数据和数据丢失,可以把它设置为false,然后手动提交偏移量
+ enable-auto-commit: true
+ # 键的反序列化方式
+ #key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
+ key-deserializer: org.apache.kafka.common.serialization.StringSerializer
+ # 值的反序列化方式(建议使用Json,这种序列化方式可以无需额外配置传输实体类)
+ value-deserializer: org.apache.kafka.common.serialization.StringSerializer
+ # 这个参数定义了poll方法最多可以拉取多少条消息,默认值为500。如果在拉取消息的时候新消息不足500条,那有多少返回多少;如果超过500条,每次只返回500。
+ # 这个默认值在有些场景下太大,有些场景很难保证能够在5min内处理完500条消息,
+ # 如果消费者无法在5分钟内处理完500条消息的话就会触发reBalance,
+ # 然后这批消息会被分配到另一个消费者中,还是会处理不完,这样这批消息就永远也处理不完。
+ # 要避免出现上述问题,提前评估好处理一条消息最长需要多少时间,然后覆盖默认的max.poll.records参数
+ # 注:需要开启BatchListener批量监听才会生效,如果不开启BatchListener则不会出现reBalance情况
+ max-poll-records: 500
+ bootstrap-servers: 150.158.33.234:9092
+ auto-commit-interval: 5000
+ fetch-max-wait: 500
+ fetch-min-size: 1
+ heartbeat-interval: 3000
+ properties:
+ # 两次poll之间的最大间隔,默认值为5分钟。如果超过这个间隔会触发reBalance
+ max:
+ poll:
+ interval:
+ ms: 600000
+ # 当broker多久没有收到consumer的心跳请求后就触发reBalance,默认值是10s
+ session:
+ timeout:
+ ms: 10000
+ listener:
+ # 在侦听器容器中运行的线程数,一般设置为 机器数*分区数
+ concurrency: 4
+ # 自动提交关闭,需要设置手动消息确认
+ ack-mode: manual_immediate
+ # 消费监听接口监听的主题不存在时,默认会报错,所以设置为false忽略错误
+ missing-topics-fatal: false
+ # 两次poll之间的最大间隔,默认值为5分钟。如果超过这个间隔会触发reBalance
+ poll-timeout: 600000
+
mvc:
pathmatch:
matching-strategy: ant_path_matcher