diff --git a/cloud-common/cloud-common-caffeine/src/main/java/com/muyu/common/caffeine/constents/CaffeineContent.java b/cloud-common/cloud-common-caffeine/src/main/java/com/muyu/common/caffeine/constents/CaffeineContent.java index 21fb299..cb1e014 100644 --- a/cloud-common/cloud-common-caffeine/src/main/java/com/muyu/common/caffeine/constents/CaffeineContent.java +++ b/cloud-common/cloud-common-caffeine/src/main/java/com/muyu/common/caffeine/constents/CaffeineContent.java @@ -10,7 +10,7 @@ package com.muyu.common.caffeine.constents; public class CaffeineContent { - public static final String CAR_VIN_KEY = "car:Vin"; + public static final String CAR_VIN_KEY = "car:vin"; public static final String VIN = "vin"; } diff --git a/cloud-common/cloud-common-kafka/pom.xml b/cloud-common/cloud-common-kafka/pom.xml index 07a4a75..f4b987e 100644 --- a/cloud-common/cloud-common-kafka/pom.xml +++ b/cloud-common/cloud-common-kafka/pom.xml @@ -18,22 +18,17 @@ + com.muyu - cloud-common-core + cloud-common-redis org.springframework.kafka spring-kafka - 2.2.6.RELEASE - - org.apache.kafka - kafka-clients - 2.1.0 - diff --git a/cloud-common/cloud-common-kafka/src/main/java/com/muyu/common/kafka/config/KafkaProducerConfig.java b/cloud-common/cloud-common-kafka/src/main/java/com/muyu/common/kafka/config/KafkaProducerConfig.java new file mode 100644 index 0000000..22dc7b8 --- /dev/null +++ b/cloud-common/cloud-common-kafka/src/main/java/com/muyu/common/kafka/config/KafkaProducerConfig.java @@ -0,0 +1,45 @@ +package com.muyu.common.kafka.config; + + +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.serialization.StringSerializer; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.kafka.core.ProducerFactory; +import org.springframework.kafka.core.DefaultKafkaProducerFactory; + +import java.util.HashMap; +import java.util.Map; + +/** + * @Author: 胡杨 + * @Name: KafkaProducerConfig + * @Description: kafka生产者配置类 + * @CreatedDate: 2024/9/27 下午7:38 + * @FilePath: com.muyu.common.kafka.config + * kafka生产者配置类 + */ +@Configuration +public class KafkaProducerConfig { + + @Bean + public ProducerFactory producerFactory() { + Map configProps = new HashMap<>(); + configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "47.116.173.119:9092"); + configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + // 添加事务相关的配置 + configProps.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true); + configProps.put(ProducerConfig.ACKS_CONFIG, "all"); + configProps.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE); + configProps.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "transactionalId"); + + return new DefaultKafkaProducerFactory<>(configProps); + } + + @Bean + public KafkaTemplate kafkaTemplate() { + return new KafkaTemplate<>(producerFactory()); + } +} diff --git a/cloud-common/cloud-common-kafka/src/main/java/com/muyu/common/kafka/constants/KafkaConstants.java b/cloud-common/cloud-common-kafka/src/main/java/com/muyu/common/kafka/constants/KafkaConstants.java new file mode 100644 index 0000000..2b89f3c --- /dev/null +++ b/cloud-common/cloud-common-kafka/src/main/java/com/muyu/common/kafka/constants/KafkaConstants.java @@ -0,0 +1,16 @@ +package com.muyu.common.kafka.constants; + +import org.springframework.beans.factory.annotation.Value; + +/** + * @Author: 胡杨 + * @date: 2024/7/10 + * @Description: kafka常量 + * @Version 1.0.0 + */ +public class KafkaConstants { + + public final static String KafkaTopic = "kafka_topic"; + + public final static String KafkaGrop = "kafka_grop"; +} diff --git a/cloud-common/cloud-common-kafka/src/main/java/com/muyu/common/kafka/constents/KafkaContent.java b/cloud-common/cloud-common-kafka/src/main/java/com/muyu/common/kafka/constents/KafkaContent.java deleted file mode 100644 index f9f245c..0000000 --- a/cloud-common/cloud-common-kafka/src/main/java/com/muyu/common/kafka/constents/KafkaContent.java +++ /dev/null @@ -1,20 +0,0 @@ -package com.muyu.common.kafka.constents; - -import org.springframework.beans.factory.annotation.Value; - -/** - * @Author: 胡杨 - * @Name: CaffeineContent - * @Description: Kafka常量 - * @CreatedDate: 2024/9/26 下午12:06 - * @FilePath: com.muyu.common.caffeine.constents - */ - -public class KafkaContent { - - @Value("${spring.kafka.consumer.group-id}") - public static String GROUP; - - @Value("${spring.kafka.template.default-topic}") - public static String TOPIC; -} diff --git a/cloud-common/cloud-common-kafka/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports b/cloud-common/cloud-common-kafka/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports new file mode 100644 index 0000000..cd59fdb --- /dev/null +++ b/cloud-common/cloud-common-kafka/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports @@ -0,0 +1 @@ +com.muyu.common.kafka.config.KafkaProducerConfig diff --git a/cloud-data-processing/src/main/java/com/muyu/data/processing/MyDataApplication.java b/cloud-data-processing/src/main/java/com/muyu/data/processing/MyDataApplication.java index 05ed5ac..63c774a 100644 --- a/cloud-data-processing/src/main/java/com/muyu/data/processing/MyDataApplication.java +++ b/cloud-data-processing/src/main/java/com/muyu/data/processing/MyDataApplication.java @@ -1,5 +1,7 @@ package com.muyu.data.processing; +import com.muyu.common.kafka.constants.KafkaConstants; + import com.muyu.common.security.annotation.EnableCustomConfig; import com.muyu.common.security.annotation.EnableMyFeignClients; import org.springframework.boot.SpringApplication; @@ -21,6 +23,8 @@ import org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration; public class MyDataApplication { public static void main(String[] args) { SpringApplication.run(MyDataApplication.class, args); + System.out.println(KafkaConstants.KafkaGrop); + System.out.println(KafkaConstants.KafkaTopic); System.out.println("MyData 模块启动成功!"); } diff --git a/cloud-data-processing/src/main/java/com/muyu/data/processing/controller/TestController.java b/cloud-data-processing/src/main/java/com/muyu/data/processing/controller/TestController.java index b97b49e..55320b8 100644 --- a/cloud-data-processing/src/main/java/com/muyu/data/processing/controller/TestController.java +++ b/cloud-data-processing/src/main/java/com/muyu/data/processing/controller/TestController.java @@ -1,11 +1,14 @@ package com.muyu.data.processing.controller; -import com.muyu.data.processing.kafka.KafkaProducerTestService; -import org.springframework.web.bind.annotation.GetMapping; -import org.springframework.web.bind.annotation.RequestMapping; -import org.springframework.web.bind.annotation.RestController; +import com.muyu.common.kafka.constants.KafkaConstants; +import jakarta.annotation.Resource; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.messaging.MessageHeaders; +import org.springframework.transaction.annotation.Transactional; +import org.springframework.web.bind.annotation.*; import lombok.extern.slf4j.Slf4j; +import org.springframework.messaging.Message; /** * @Author: 胡杨 @@ -18,10 +21,20 @@ import lombok.extern.slf4j.Slf4j; @RestController @RequestMapping("/Test") public class TestController { + @Resource + private KafkaTemplate kafkaTemplate; @GetMapping("/testKafka") - public void sendMsg(String msg) { - new KafkaProducerTestService().sendMessage(msg); + @Transactional + public void sendMsg(@RequestParam("msg") String msg) { + try { + kafkaTemplate.send(KafkaConstants.KafkaTopic, msg).get(); + System.out.println("同步消息发送成功: " + msg); + } catch (Exception e) { + e.printStackTrace(); + System.out.println("同步消息发送失败: " + msg); + } + } diff --git a/cloud-data-processing/src/main/java/com/muyu/data/processing/kafka/KafkaConsumerService.java b/cloud-data-processing/src/main/java/com/muyu/data/processing/kafka/KafkaConsumerService.java index e6f4e3c..6de9775 100644 --- a/cloud-data-processing/src/main/java/com/muyu/data/processing/kafka/KafkaConsumerService.java +++ b/cloud-data-processing/src/main/java/com/muyu/data/processing/kafka/KafkaConsumerService.java @@ -1,9 +1,11 @@ package com.muyu.data.processing.kafka; +import com.muyu.common.kafka.constants.KafkaConstants; import io.swagger.v3.oas.annotations.servers.Server; import lombok.extern.slf4j.Slf4j; import org.springframework.kafka.annotation.KafkaListener; +import org.springframework.stereotype.Component; /** * @Author: 胡杨 @@ -14,11 +16,11 @@ import org.springframework.kafka.annotation.KafkaListener; */ @Slf4j -@Server +@Component public class KafkaConsumerService { - @KafkaListener(topics = "iov-car-topic", groupId = "iov-car-group") - private void listen(String msg) { + @KafkaListener(topics = {KafkaConstants.KafkaTopic}, groupId = KafkaConstants.KafkaGrop) + public void listen(String msg) { log.info("kafka 消费消息:{}", msg); } } diff --git a/cloud-data-processing/src/main/java/com/muyu/data/processing/kafka/KafkaProducerTestService.java b/cloud-data-processing/src/main/java/com/muyu/data/processing/kafka/KafkaProducerTestService.java deleted file mode 100644 index 7def85e..0000000 --- a/cloud-data-processing/src/main/java/com/muyu/data/processing/kafka/KafkaProducerTestService.java +++ /dev/null @@ -1,30 +0,0 @@ -package com.muyu.data.processing.kafka; - - -import io.swagger.v3.oas.annotations.servers.Server; -import jakarta.annotation.Resource; -import lombok.extern.slf4j.Slf4j; -import org.springframework.kafka.core.KafkaTemplate; -import org.springframework.kafka.support.SendResult; - -import java.util.concurrent.CompletableFuture; - -/** - * @Author: 胡杨 - * @Name: KafkaProducerTestService - * @Description: kafka测试生产者 - * @CreatedDate: 2024/9/27 上午9:27 - * @FilePath: com.muyu.data.processing.kafka - */ -@Slf4j -@Server -public class KafkaProducerTestService { - - @Resource - private KafkaTemplate kafkaTemplate; - - public void sendMessage(String msg) { - CompletableFuture> send = kafkaTemplate.send("iov-car-topic", msg); - log.info("kafka 发送消息:{}",msg); - } -}