feat 消费者

master
rouchen 2024-06-11 22:38:47 +08:00
commit 6ae08f8354
18 changed files with 600 additions and 0 deletions

33
.gitignore vendored 100644
View File

@ -0,0 +1,33 @@
HELP.md
target/
!.mvn/wrapper/maven-wrapper.jar
!**/src/main/**/target/
!**/src/test/**/target/
### STS ###
.apt_generated
.classpath
.factorypath
.project
.settings
.springBeans
.sts4-cache
### IntelliJ IDEA ###
.idea
*.iws
*.iml
*.ipr
### NetBeans ###
/nbproject/private/
/nbbuild/
/dist/
/nbdist/
/.nb-gradle/
build/
!**/src/main/**/build/
!**/src/test/**/build/
### VS Code ###
.vscode/

81
pom.xml 100644
View File

@ -0,0 +1,81 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.muyu</groupId>
<artifactId>kafka_demo</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>kafka_demo</name>
<description>Demo project for Spring Boot</description>
<properties>
<java.version>1.8</java.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<spring-boot.version>2.6.13</spring-boot.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.8.11</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.8.0</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-dependencies</artifactId>
<version>${spring-boot.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.1</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
<encoding>UTF-8</encoding>
</configuration>
</plugin>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<version>${spring-boot.version}</version>
<configuration>
<mainClass>com.muyu.KafkaDemoApplication</mainClass>
<skip>true</skip>
</configuration>
<executions>
<execution>
<id>repackage</id>
<goals>
<goal>repackage</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>

View File

@ -0,0 +1,13 @@
package com.muyu;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class KafkaDemoApplication {
public static void main(String[] args) {
SpringApplication.run(KafkaDemoApplication.class, args);
}
}

View File

@ -0,0 +1,34 @@
//package com.muyu.demo;
//
//import org.apache.kafka.clients.consumer.KafkaConsumer;
//import org.apache.kafka.common.serialization.StringDeserializer;
//
//import java.util.ArrayList;
//import java.util.Collection;
//import java.util.Properties;
//
///**
// * 消费者 Consumer
// *
// * @author Yangle
// * Date 2024/6/9 10:27
// */
//public class Consumer {
// public static void main(String[] args) {
// Properties properties = new Properties();
// properties.put("bootstrap.servers", "127.0.0.1:9092");
// properties.put("key.deserializer", StringDeserializer.class.getName());
// properties.put("value.deserializer", StringDeserializer.class.getName());
//
// properties.put("group.id", "datou");
// KafkaConsumer<String,String> consumer = new KafkaConsumer<>(properties);
// Collection<String> topics= new ArrayList<>();
// topics.add("da");
// consumer.subscribe(topics);
// while (true) {
// consumer.poll(1000).forEach(record -> {
// System.out.println(record.value());
// });
// }
// }
//}

View File

@ -0,0 +1,38 @@
package com.muyu.demo;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import java.util.HashMap;
/**
* KafkaConsumerConfig
*
* @author Yangle
* Date 2024/6/11 17:09
*/
@Configuration
public class KafkaConsumerConfig {
@Bean
public ConsumerFactory<String,String> consumerFactory(){
HashMap<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"127.0.0.1:9092");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.GROUP_ID_CONFIG,"test-group");
return new DefaultKafkaConsumerFactory<>(props);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String,String> kafkaListenerContainerFactory(){
ConcurrentKafkaListenerContainerFactory<String,String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
}

View File

@ -0,0 +1,19 @@
package com.muyu.demo;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;
/**
* KafkaMessageListener
*
* @author Yangle
* Date 2024/6/11 17:13
*/
@Service
public class KafkaMessageListener {
@KafkaListener(topics = {"test"})
public void onMessage(String message) {
System.out.println("接收到消息+++++" + message);
}
}

View File

@ -0,0 +1,63 @@
package com.muyu.demo;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.springframework.boot.SpringApplication;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
/**
* PartitionerProducer
*
* @author Yangle
* Date 2024/6/9 10:11
*/
public class PartitionerProducer {
public static void main(String[] args) {
Properties properties = new Properties();
properties.put("bootstrap.servers", "127.0.0.1:9092");
properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);
// AdminClient adminClient = AdminClient.create(properties);
kafkaProducer.send(new ProducerRecord<>("test", "ooo","hello1"));
// List<NewTopic> newTopics = new ArrayList<>();
// for (int i = 1; i < 6; i++) {
// String topicName = "hello" + i;
// NewTopic newTopic = new NewTopic(topicName, i, (short) 1);
// newTopics.add(newTopic);
// }
// try {
// // 异步创建主题,并等待所有主题创建完成
// adminClient.createTopics(newTopics).all().get();
//
// // 发送消息到每个主题
// for (NewTopic newTopic : newTopics) {
// String topicName = newTopic.name();
// for (int i = 1; i < 10; i++) {
// // 假设我们使用 i 作为键,并构造一个包含 i 的值
// String key = String.valueOf(i); // 将 int 转换为 String 作为键
// String value = "message_" + i; // 构造一个有意义的值
// ProducerRecord<String, String> producerRecord = new ProducerRecord<>(topicName, key, value);
// kafkaProducer.send(producerRecord);
// }
// }
// } catch (InterruptedException | ExecutionException e) {
// // 合并异常处理
// throw new RuntimeException("Failed to create topics or produce messages", e);
// } finally {
// // 确保在finally块中关闭资源
// kafkaProducer.close();
// adminClient.close();
// }
}
}

View File

@ -0,0 +1,10 @@
package com.muyu.demo.kafka;
/**
* DataListen
*
* @author Yangle
* Date 2024/6/11 21:41
*/
public class DataListen {
}

View File

@ -0,0 +1,67 @@
/*
* Copyright 2013-2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.muyu.demos.web;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.ModelAttribute;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.ResponseBody;
/**
* @author <a href="mailto:chenxilzx1@gmail.com">theonefx</a>
*/
@Controller
public class BasicController {
// http://127.0.0.1:8080/hello?name=lisi
@RequestMapping("/hello")
@ResponseBody
public String hello(@RequestParam(name = "name", defaultValue = "unknown user") String name) {
return "Hello " + name;
}
// http://127.0.0.1:8080/user
@RequestMapping("/user")
@ResponseBody
public User user() {
User user = new User();
user.setName("theonefx");
user.setAge(666);
return user;
}
// http://127.0.0.1:8080/save_user?name=newName&age=11
@RequestMapping("/save_user")
@ResponseBody
public String saveUser(User u) {
return "user will save: name=" + u.getName() + ", age=" + u.getAge();
}
// http://127.0.0.1:8080/html
@RequestMapping("/html")
public String html(){
return "index.html";
}
@ModelAttribute
public void parseUser(@RequestParam(name = "name", defaultValue = "unknown user") String name
, @RequestParam(name = "age", defaultValue = "12") Integer age, User user) {
user.setName("zhangsan");
user.setAge(18);
}
}

View File

@ -0,0 +1,44 @@
/*
* Copyright 2013-2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.muyu.demos.web;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.ResponseBody;
/**
* @author <a href="mailto:chenxilzx1@gmail.com">theonefx</a>
*/
@Controller
public class PathVariableController {
// http://127.0.0.1:8080/user/123/roles/222
@RequestMapping(value = "/user/{userId}/roles/{roleId}", method = RequestMethod.GET)
@ResponseBody
public String getLogin(@PathVariable("userId") String userId, @PathVariable("roleId") String roleId) {
return "User Id : " + userId + " Role Id : " + roleId;
}
// http://127.0.0.1:8080/javabeat/somewords
@RequestMapping(value = "/javabeat/{regexp1:[a-z-]+}", method = RequestMethod.GET)
@ResponseBody
public String getRegExp(@PathVariable("regexp1") String regexp1) {
return "URI Part : " + regexp1;
}
}

View File

@ -0,0 +1,43 @@
/*
* Copyright 2013-2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.muyu.demos.web;
/**
* @author <a href="mailto:chenxilzx1@gmail.com">theonefx</a>
*/
public class User {
private String name;
private Integer age;
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public Integer getAge() {
return age;
}
public void setAge(Integer age) {
this.age = age;
}
}

View File

@ -0,0 +1,20 @@
package com.muyu.kafka;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
/**
* KafkaConsumer
*
* @author Yangle
* Date 2024/6/7 20:06
*/
@Component
public class KafkaConsumer {
@KafkaListener(topics = {"test"}, groupId = "test")
public void onMessage(String message) {
System.out.println("接收到消息:" + message);
}
}

View File

@ -0,0 +1,26 @@
package com.muyu.kafka;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RestController;
/**
* KafkaController
*
* @author Yangle
* Date 2024/6/7 20:07
*/
@RestController
public class KafkaController {
@Autowired
private KafkaProducerExample kafkaProducerExample;
@GetMapping("/send/{message}")
public String sendMessage(@PathVariable String message) {
kafkaProducerExample.sendMessage(message);
return "发送消息";
}
}

View File

@ -0,0 +1,31 @@
package com.muyu.kafka;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
import java.util.Objects;
/**
* KafkaProducerExample
*
* @author Yangle
* Date 2024/6/7 20:08
*/
@Service
public class KafkaProducerExample {
@Autowired
private KafkaTemplate<String,Object> kafkaTemplate;
public void sendMessage(String message){
if (Objects.nonNull("message")){
kafkaTemplate.send("test",message);
}else {
System.out.println("topic is null");
}
}
}

View File

@ -0,0 +1,18 @@
package com.muyu.kafka;
import java.util.Properties;
/**
* LineSplit
*
* @author Yangle
* Date 2024/6/9 10:01
*/
public class LineSplit {
public static void main(String[] args) {
Properties props = new Properties();
props.put("acks", "all");
}
}

View File

@ -0,0 +1,41 @@
spring:
application:
name: protocol-parsing
jackson:
date-format: yyyy-MM-dd HH:mm:ss
time-zone: GMT+8
rabbitmq:
username: guest
password: guest
virtualHost: /
port: 5672
host: 115.159.211.196
listener:
simple:
prefetch: 1 # 每次只能获取一条,处理完成才能获取下一条
publisher-confirm-type: correlated #确认消息已发送到交换机(Exchange)
publisher-returns: true #确认消息已发送到队列(Queue)
kafka:
bootstrap-servers: 127.0.0.1:9092
producer:
acks: all
retries: 0
batch-size: 16384
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
consumer:
group-id: defaultGroup
enable-auto-commit: true
auto-commit-interval: 1000
auto-offset-reset: latest
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
properties:
session.timeout.ms: 30000
listener:
ack-mode: batch
type: batch
topic:
partitions: 8
server:
port: 8080

View File

@ -0,0 +1,6 @@
<html>
<body>
<h1>hello word!!!</h1>
<p>this is a html page</p>
</body>
</html>

View File

@ -0,0 +1,13 @@
package com.muyu;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;
@SpringBootTest
class KafkaDemoApplicationTests {
@Test
void contextLoads() {
}
}