完成kafka测试

dev.protocol.parsing
面包骑士 2024-09-27 22:00:16 +08:00 committed by chentaisen
parent 92ffcd21bd
commit b0f3162ceb
10 changed files with 93 additions and 67 deletions

View File

@ -10,7 +10,7 @@ package com.muyu.common.caffeine.constents;
public class CaffeineContent {
public static final String CAR_VIN_KEY = "car:Vin";
public static final String CAR_VIN_KEY = "car:vin";
public static final String VIN = "vin";
}

View File

@ -18,22 +18,17 @@
</properties>
<dependencies>
<dependency>
<groupId>com.muyu</groupId>
<artifactId>cloud-common-core</artifactId>
<artifactId>cloud-common-redis</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.2.6.RELEASE</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.1.0</version>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,45 @@
package com.muyu.common.kafka.config;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import java.util.HashMap;
import java.util.Map;
/**
* @Author:
* @Name: KafkaProducerConfig
* @Description: kafka
* @CreatedDate: 2024/9/27 7:38
* @FilePath: com.muyu.common.kafka.config
* kafka
*/
@Configuration
public class KafkaProducerConfig {
@Bean
public ProducerFactory<String, Object> producerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "47.116.173.119:9092");
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
// 添加事务相关的配置
configProps.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
configProps.put(ProducerConfig.ACKS_CONFIG, "all");
configProps.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE);
configProps.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "transactionalId");
return new DefaultKafkaProducerFactory<>(configProps);
}
@Bean
public KafkaTemplate<String, Object> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}

View File

@ -0,0 +1,16 @@
package com.muyu.common.kafka.constants;
import org.springframework.beans.factory.annotation.Value;
/**
* @Author:
* @date: 2024/7/10
* @Description: kafka
* @Version 1.0.0
*/
public class KafkaConstants {
public final static String KafkaTopic = "kafka_topic";
public final static String KafkaGrop = "kafka_grop";
}

View File

@ -1,20 +0,0 @@
package com.muyu.common.kafka.constents;
import org.springframework.beans.factory.annotation.Value;
/**
* @Author:
* @Name: CaffeineContent
* @Description: Kafka
* @CreatedDate: 2024/9/26 12:06
* @FilePath: com.muyu.common.caffeine.constents
*/
public class KafkaContent {
@Value("${spring.kafka.consumer.group-id}")
public static String GROUP;
@Value("${spring.kafka.template.default-topic}")
public static String TOPIC;
}

View File

@ -1,5 +1,7 @@
package com.muyu.data.processing;
import com.muyu.common.kafka.constants.KafkaConstants;
import com.muyu.common.security.annotation.EnableCustomConfig;
import com.muyu.common.security.annotation.EnableMyFeignClients;
import org.springframework.boot.SpringApplication;
@ -21,6 +23,8 @@ import org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration;
public class MyDataApplication {
public static void main(String[] args) {
SpringApplication.run(MyDataApplication.class, args);
System.out.println(KafkaConstants.KafkaGrop);
System.out.println(KafkaConstants.KafkaTopic);
System.out.println("MyData 模块启动成功!");
}

View File

@ -1,11 +1,14 @@
package com.muyu.data.processing.controller;
import com.muyu.data.processing.kafka.KafkaProducerTestService;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import com.muyu.common.kafka.constants.KafkaConstants;
import jakarta.annotation.Resource;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.messaging.MessageHeaders;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.web.bind.annotation.*;
import lombok.extern.slf4j.Slf4j;
import org.springframework.messaging.Message;
/**
* @Author:
@ -18,10 +21,20 @@ import lombok.extern.slf4j.Slf4j;
@RestController
@RequestMapping("/Test")
public class TestController {
@Resource
private KafkaTemplate<String,Object> kafkaTemplate;
@GetMapping("/testKafka")
public void sendMsg(String msg) {
new KafkaProducerTestService().sendMessage(msg);
@Transactional
public void sendMsg(@RequestParam("msg") String msg) {
try {
kafkaTemplate.send(KafkaConstants.KafkaTopic, msg).get();
System.out.println("同步消息发送成功: " + msg);
} catch (Exception e) {
e.printStackTrace();
System.out.println("同步消息发送失败: " + msg);
}
}

View File

@ -1,9 +1,11 @@
package com.muyu.data.processing.kafka;
import com.muyu.common.kafka.constants.KafkaConstants;
import io.swagger.v3.oas.annotations.servers.Server;
import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
/**
* @Author:
@ -14,11 +16,11 @@ import org.springframework.kafka.annotation.KafkaListener;
*/
@Slf4j
@Server
@Component
public class KafkaConsumerService {
@KafkaListener(topics = "iov-car-topic", groupId = "iov-car-group")
private void listen(String msg) {
@KafkaListener(topics = {KafkaConstants.KafkaTopic}, groupId = KafkaConstants.KafkaGrop)
public void listen(String msg) {
log.info("kafka 消费消息:{}", msg);
}
}

View File

@ -1,30 +0,0 @@
package com.muyu.data.processing.kafka;
import io.swagger.v3.oas.annotations.servers.Server;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import java.util.concurrent.CompletableFuture;
/**
* @Author:
* @Name: KafkaProducerTestService
* @Description: kafka
* @CreatedDate: 2024/9/27 9:27
* @FilePath: com.muyu.data.processing.kafka
*/
@Slf4j
@Server
public class KafkaProducerTestService {
@Resource
private KafkaTemplate<String,String> kafkaTemplate;
public void sendMessage(String msg) {
CompletableFuture<SendResult<String, String>> send = kafkaTemplate.send("iov-car-topic", msg);
log.info("kafka 发送消息:{}",msg);
}
}