From 6ae08f83541d8be11bd3c64071ae0d6e5499e588 Mon Sep 17 00:00:00 2001 From: rouchen <3133657697@qq.com> Date: Tue, 11 Jun 2024 22:38:47 +0800 Subject: [PATCH] =?UTF-8?q?feat=20=E6=B6=88=E8=B4=B9=E8=80=85?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .gitignore | 33 ++++++++ pom.xml | 81 +++++++++++++++++++ .../java/com/muyu/KafkaDemoApplication.java | 13 +++ src/main/java/com/muyu/demo/Consumer.java | 34 ++++++++ .../com/muyu/demo/KafkaConsumerConfig.java | 38 +++++++++ .../com/muyu/demo/KafkaMessageListener.java | 19 +++++ .../com/muyu/demo/PartitionerProducer.java | 63 +++++++++++++++ .../java/com/muyu/demo/kafka/DataListen.java | 10 +++ .../com/muyu/demos/web/BasicController.java | 67 +++++++++++++++ .../demos/web/PathVariableController.java | 44 ++++++++++ src/main/java/com/muyu/demos/web/User.java | 43 ++++++++++ .../java/com/muyu/kafka/KafkaConsumer.java | 20 +++++ .../java/com/muyu/kafka/KafkaController.java | 26 ++++++ .../com/muyu/kafka/KafkaProducerExample.java | 31 +++++++ src/main/java/com/muyu/kafka/LineSplit.java | 18 +++++ src/main/resources/application.yml | 41 ++++++++++ src/main/resources/static/index.html | 6 ++ .../com/muyu/KafkaDemoApplicationTests.java | 13 +++ 18 files changed, 600 insertions(+) create mode 100644 .gitignore create mode 100644 pom.xml create mode 100644 src/main/java/com/muyu/KafkaDemoApplication.java create mode 100644 src/main/java/com/muyu/demo/Consumer.java create mode 100644 src/main/java/com/muyu/demo/KafkaConsumerConfig.java create mode 100644 src/main/java/com/muyu/demo/KafkaMessageListener.java create mode 100644 src/main/java/com/muyu/demo/PartitionerProducer.java create mode 100644 src/main/java/com/muyu/demo/kafka/DataListen.java create mode 100644 src/main/java/com/muyu/demos/web/BasicController.java create mode 100644 src/main/java/com/muyu/demos/web/PathVariableController.java create mode 100644 src/main/java/com/muyu/demos/web/User.java create mode 100644 src/main/java/com/muyu/kafka/KafkaConsumer.java create mode 100644 src/main/java/com/muyu/kafka/KafkaController.java create mode 100644 src/main/java/com/muyu/kafka/KafkaProducerExample.java create mode 100644 src/main/java/com/muyu/kafka/LineSplit.java create mode 100644 src/main/resources/application.yml create mode 100644 src/main/resources/static/index.html create mode 100644 src/test/java/com/muyu/KafkaDemoApplicationTests.java diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..549e00a --- /dev/null +++ b/.gitignore @@ -0,0 +1,33 @@ +HELP.md +target/ +!.mvn/wrapper/maven-wrapper.jar +!**/src/main/**/target/ +!**/src/test/**/target/ + +### STS ### +.apt_generated +.classpath +.factorypath +.project +.settings +.springBeans +.sts4-cache + +### IntelliJ IDEA ### +.idea +*.iws +*.iml +*.ipr + +### NetBeans ### +/nbproject/private/ +/nbbuild/ +/dist/ +/nbdist/ +/.nb-gradle/ +build/ +!**/src/main/**/build/ +!**/src/test/**/build/ + +### VS Code ### +.vscode/ diff --git a/pom.xml b/pom.xml new file mode 100644 index 0000000..8b59c68 --- /dev/null +++ b/pom.xml @@ -0,0 +1,81 @@ + + + 4.0.0 + com.muyu + kafka_demo + 0.0.1-SNAPSHOT + kafka_demo + Demo project for Spring Boot + + 1.8 + UTF-8 + UTF-8 + 2.6.13 + + + + org.springframework.boot + spring-boot-starter-web + + + org.springframework.kafka + spring-kafka + 2.8.11 + + + org.apache.kafka + kafka-clients + 2.8.0 + + + org.springframework.boot + spring-boot-starter-test + test + + + + + + org.springframework.boot + spring-boot-dependencies + ${spring-boot.version} + pom + import + + + + + + + + org.apache.maven.plugins + maven-compiler-plugin + 3.8.1 + + 1.8 + 1.8 + UTF-8 + + + + org.springframework.boot + spring-boot-maven-plugin + ${spring-boot.version} + + com.muyu.KafkaDemoApplication + true + + + + repackage + + repackage + + + + + + + + diff --git a/src/main/java/com/muyu/KafkaDemoApplication.java b/src/main/java/com/muyu/KafkaDemoApplication.java new file mode 100644 index 0000000..9f70dc4 --- /dev/null +++ b/src/main/java/com/muyu/KafkaDemoApplication.java @@ -0,0 +1,13 @@ +package com.muyu; + +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; + +@SpringBootApplication +public class KafkaDemoApplication { + + public static void main(String[] args) { + SpringApplication.run(KafkaDemoApplication.class, args); + } + +} diff --git a/src/main/java/com/muyu/demo/Consumer.java b/src/main/java/com/muyu/demo/Consumer.java new file mode 100644 index 0000000..85afc1c --- /dev/null +++ b/src/main/java/com/muyu/demo/Consumer.java @@ -0,0 +1,34 @@ +//package com.muyu.demo; +// +//import org.apache.kafka.clients.consumer.KafkaConsumer; +//import org.apache.kafka.common.serialization.StringDeserializer; +// +//import java.util.ArrayList; +//import java.util.Collection; +//import java.util.Properties; +// +///** +// * 消费者 Consumer +// * +// * @author Yangle +// * Date 2024/6/9 10:27 +// */ +//public class Consumer { +// public static void main(String[] args) { +// Properties properties = new Properties(); +// properties.put("bootstrap.servers", "127.0.0.1:9092"); +// properties.put("key.deserializer", StringDeserializer.class.getName()); +// properties.put("value.deserializer", StringDeserializer.class.getName()); +// +// properties.put("group.id", "datou"); +// KafkaConsumer consumer = new KafkaConsumer<>(properties); +// Collection topics= new ArrayList<>(); +// topics.add("da"); +// consumer.subscribe(topics); +// while (true) { +// consumer.poll(1000).forEach(record -> { +// System.out.println(record.value()); +// }); +// } +// } +//} diff --git a/src/main/java/com/muyu/demo/KafkaConsumerConfig.java b/src/main/java/com/muyu/demo/KafkaConsumerConfig.java new file mode 100644 index 0000000..e44fa92 --- /dev/null +++ b/src/main/java/com/muyu/demo/KafkaConsumerConfig.java @@ -0,0 +1,38 @@ +package com.muyu.demo; + +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; +import org.springframework.kafka.config.KafkaListenerContainerFactory; +import org.springframework.kafka.core.ConsumerFactory; +import org.springframework.kafka.core.DefaultKafkaConsumerFactory; + +import java.util.HashMap; + +/** + * 消费者的配置类 KafkaConsumerConfig + * + * @author Yangle + * Date 2024/6/11 17:09 + */ +@Configuration +public class KafkaConsumerConfig { + + @Bean + public ConsumerFactory consumerFactory(){ + HashMap props = new HashMap<>(); + props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"127.0.0.1:9092"); + props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer"); + props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer"); + props.put(ConsumerConfig.GROUP_ID_CONFIG,"test-group"); + return new DefaultKafkaConsumerFactory<>(props); + } + + @Bean + public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory(){ + ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>(); + factory.setConsumerFactory(consumerFactory()); + return factory; + } +} diff --git a/src/main/java/com/muyu/demo/KafkaMessageListener.java b/src/main/java/com/muyu/demo/KafkaMessageListener.java new file mode 100644 index 0000000..f7d70f0 --- /dev/null +++ b/src/main/java/com/muyu/demo/KafkaMessageListener.java @@ -0,0 +1,19 @@ +package com.muyu.demo; + +import org.springframework.kafka.annotation.KafkaListener; +import org.springframework.stereotype.Service; + +/** + * 消费者监听 KafkaMessageListener + * + * @author Yangle + * Date 2024/6/11 17:13 + */ +@Service +public class KafkaMessageListener { + + @KafkaListener(topics = {"test"}) + public void onMessage(String message) { + System.out.println("接收到消息+++++:" + message); + } +} diff --git a/src/main/java/com/muyu/demo/PartitionerProducer.java b/src/main/java/com/muyu/demo/PartitionerProducer.java new file mode 100644 index 0000000..7939e97 --- /dev/null +++ b/src/main/java/com/muyu/demo/PartitionerProducer.java @@ -0,0 +1,63 @@ +package com.muyu.demo; + +import org.apache.kafka.clients.admin.AdminClient; +import org.apache.kafka.clients.admin.NewTopic; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.springframework.boot.SpringApplication; + +import java.util.ArrayList; +import java.util.List; +import java.util.Properties; +import java.util.concurrent.ExecutionException; + +/** + * PartitionerProducer + * + * @author Yangle + * Date 2024/6/9 10:11 + */ +public class PartitionerProducer { + + public static void main(String[] args) { + Properties properties = new Properties(); + properties.put("bootstrap.servers", "127.0.0.1:9092"); + properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); + properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); + + KafkaProducer kafkaProducer = new KafkaProducer<>(properties); +// AdminClient adminClient = AdminClient.create(properties); + + kafkaProducer.send(new ProducerRecord<>("test", "ooo","hello1")); +// List newTopics = new ArrayList<>(); +// for (int i = 1; i < 6; i++) { +// String topicName = "hello" + i; +// NewTopic newTopic = new NewTopic(topicName, i, (short) 1); +// newTopics.add(newTopic); +// } + +// try { +// // 异步创建主题,并等待所有主题创建完成 +// adminClient.createTopics(newTopics).all().get(); +// +// // 发送消息到每个主题 +// for (NewTopic newTopic : newTopics) { +// String topicName = newTopic.name(); +// for (int i = 1; i < 10; i++) { +// // 假设我们使用 i 作为键,并构造一个包含 i 的值 +// String key = String.valueOf(i); // 将 int 转换为 String 作为键 +// String value = "message_" + i; // 构造一个有意义的值 +// ProducerRecord producerRecord = new ProducerRecord<>(topicName, key, value); +// kafkaProducer.send(producerRecord); +// } +// } +// } catch (InterruptedException | ExecutionException e) { +// // 合并异常处理 +// throw new RuntimeException("Failed to create topics or produce messages", e); +// } finally { +// // 确保在finally块中关闭资源 +// kafkaProducer.close(); +// adminClient.close(); +// } + } +} diff --git a/src/main/java/com/muyu/demo/kafka/DataListen.java b/src/main/java/com/muyu/demo/kafka/DataListen.java new file mode 100644 index 0000000..ba657f6 --- /dev/null +++ b/src/main/java/com/muyu/demo/kafka/DataListen.java @@ -0,0 +1,10 @@ +package com.muyu.demo.kafka; + +/** + * DataListen + * + * @author Yangle + * Date 2024/6/11 21:41 + */ +public class DataListen { +} diff --git a/src/main/java/com/muyu/demos/web/BasicController.java b/src/main/java/com/muyu/demos/web/BasicController.java new file mode 100644 index 0000000..65e8908 --- /dev/null +++ b/src/main/java/com/muyu/demos/web/BasicController.java @@ -0,0 +1,67 @@ +/* + * Copyright 2013-2018 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.muyu.demos.web; + +import org.springframework.stereotype.Controller; +import org.springframework.web.bind.annotation.ModelAttribute; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RequestParam; +import org.springframework.web.bind.annotation.ResponseBody; + +/** + * @author theonefx + */ +@Controller +public class BasicController { + + // http://127.0.0.1:8080/hello?name=lisi + @RequestMapping("/hello") + @ResponseBody + public String hello(@RequestParam(name = "name", defaultValue = "unknown user") String name) { + return "Hello " + name; + } + + // http://127.0.0.1:8080/user + @RequestMapping("/user") + @ResponseBody + public User user() { + User user = new User(); + user.setName("theonefx"); + user.setAge(666); + return user; + } + + // http://127.0.0.1:8080/save_user?name=newName&age=11 + @RequestMapping("/save_user") + @ResponseBody + public String saveUser(User u) { + return "user will save: name=" + u.getName() + ", age=" + u.getAge(); + } + + // http://127.0.0.1:8080/html + @RequestMapping("/html") + public String html(){ + return "index.html"; + } + + @ModelAttribute + public void parseUser(@RequestParam(name = "name", defaultValue = "unknown user") String name + , @RequestParam(name = "age", defaultValue = "12") Integer age, User user) { + user.setName("zhangsan"); + user.setAge(18); + } +} diff --git a/src/main/java/com/muyu/demos/web/PathVariableController.java b/src/main/java/com/muyu/demos/web/PathVariableController.java new file mode 100644 index 0000000..e3024a9 --- /dev/null +++ b/src/main/java/com/muyu/demos/web/PathVariableController.java @@ -0,0 +1,44 @@ +/* + * Copyright 2013-2018 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.muyu.demos.web; + +import org.springframework.stereotype.Controller; +import org.springframework.web.bind.annotation.PathVariable; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RequestMethod; +import org.springframework.web.bind.annotation.ResponseBody; + +/** + * @author theonefx + */ +@Controller +public class PathVariableController { + + // http://127.0.0.1:8080/user/123/roles/222 + @RequestMapping(value = "/user/{userId}/roles/{roleId}", method = RequestMethod.GET) + @ResponseBody + public String getLogin(@PathVariable("userId") String userId, @PathVariable("roleId") String roleId) { + return "User Id : " + userId + " Role Id : " + roleId; + } + + // http://127.0.0.1:8080/javabeat/somewords + @RequestMapping(value = "/javabeat/{regexp1:[a-z-]+}", method = RequestMethod.GET) + @ResponseBody + public String getRegExp(@PathVariable("regexp1") String regexp1) { + return "URI Part : " + regexp1; + } +} diff --git a/src/main/java/com/muyu/demos/web/User.java b/src/main/java/com/muyu/demos/web/User.java new file mode 100644 index 0000000..8e07afb --- /dev/null +++ b/src/main/java/com/muyu/demos/web/User.java @@ -0,0 +1,43 @@ +/* + * Copyright 2013-2018 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.muyu.demos.web; + +/** + * @author theonefx + */ +public class User { + + private String name; + + private Integer age; + + public String getName() { + return name; + } + + public void setName(String name) { + this.name = name; + } + + public Integer getAge() { + return age; + } + + public void setAge(Integer age) { + this.age = age; + } +} diff --git a/src/main/java/com/muyu/kafka/KafkaConsumer.java b/src/main/java/com/muyu/kafka/KafkaConsumer.java new file mode 100644 index 0000000..68375e9 --- /dev/null +++ b/src/main/java/com/muyu/kafka/KafkaConsumer.java @@ -0,0 +1,20 @@ +package com.muyu.kafka; + +import org.springframework.kafka.annotation.KafkaListener; +import org.springframework.stereotype.Component; + +/** + * 消费者 KafkaConsumer + * + * @author Yangle + * Date 2024/6/7 20:06 + */ +@Component +public class KafkaConsumer { + + @KafkaListener(topics = {"test"}, groupId = "test") + public void onMessage(String message) { + System.out.println("接收到消息:" + message); + } + +} diff --git a/src/main/java/com/muyu/kafka/KafkaController.java b/src/main/java/com/muyu/kafka/KafkaController.java new file mode 100644 index 0000000..de1f6fc --- /dev/null +++ b/src/main/java/com/muyu/kafka/KafkaController.java @@ -0,0 +1,26 @@ +package com.muyu.kafka; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.PathVariable; +import org.springframework.web.bind.annotation.RestController; + +/** + * KafkaController + * + * @author Yangle + * Date 2024/6/7 20:07 + */ +@RestController + +public class KafkaController { + + @Autowired + private KafkaProducerExample kafkaProducerExample; + + @GetMapping("/send/{message}") + public String sendMessage(@PathVariable String message) { + kafkaProducerExample.sendMessage(message); + return "发送消息"; + } +} diff --git a/src/main/java/com/muyu/kafka/KafkaProducerExample.java b/src/main/java/com/muyu/kafka/KafkaProducerExample.java new file mode 100644 index 0000000..b4acdc2 --- /dev/null +++ b/src/main/java/com/muyu/kafka/KafkaProducerExample.java @@ -0,0 +1,31 @@ +package com.muyu.kafka; + +import org.apache.kafka.clients.producer.KafkaProducer; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.stereotype.Service; + +import java.util.Objects; + +/** + * 生产者 KafkaProducerExample + * + * @author Yangle + * Date 2024/6/7 20:08 + */ +@Service +public class KafkaProducerExample { + + @Autowired + private KafkaTemplate kafkaTemplate; + + + public void sendMessage(String message){ + if (Objects.nonNull("message")){ + kafkaTemplate.send("test",message); + }else { + System.out.println("topic is null"); + } + } + +} diff --git a/src/main/java/com/muyu/kafka/LineSplit.java b/src/main/java/com/muyu/kafka/LineSplit.java new file mode 100644 index 0000000..fc540b7 --- /dev/null +++ b/src/main/java/com/muyu/kafka/LineSplit.java @@ -0,0 +1,18 @@ +package com.muyu.kafka; + +import java.util.Properties; + +/** + * LineSplit + * + * @author Yangle + * Date 2024/6/9 10:01 + */ + +public class LineSplit { + public static void main(String[] args) { + Properties props = new Properties(); + + props.put("acks", "all"); + } +} diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml new file mode 100644 index 0000000..d2dd5f2 --- /dev/null +++ b/src/main/resources/application.yml @@ -0,0 +1,41 @@ +spring: + application: + name: protocol-parsing + jackson: + date-format: yyyy-MM-dd HH:mm:ss + time-zone: GMT+8 + rabbitmq: + username: guest + password: guest + virtualHost: / + port: 5672 + host: 115.159.211.196 + listener: + simple: + prefetch: 1 # 每次只能获取一条,处理完成才能获取下一条 + publisher-confirm-type: correlated #确认消息已发送到交换机(Exchange) + publisher-returns: true #确认消息已发送到队列(Queue) + kafka: + bootstrap-servers: 127.0.0.1:9092 + producer: + acks: all + retries: 0 + batch-size: 16384 + key-serializer: org.apache.kafka.common.serialization.StringSerializer + value-serializer: org.apache.kafka.common.serialization.StringSerializer + consumer: + group-id: defaultGroup + enable-auto-commit: true + auto-commit-interval: 1000 + auto-offset-reset: latest + key-deserializer: org.apache.kafka.common.serialization.StringDeserializer + value-deserializer: org.apache.kafka.common.serialization.StringDeserializer + properties: + session.timeout.ms: 30000 + listener: + ack-mode: batch + type: batch + topic: + partitions: 8 +server: + port: 8080 diff --git a/src/main/resources/static/index.html b/src/main/resources/static/index.html new file mode 100644 index 0000000..e2d94a2 --- /dev/null +++ b/src/main/resources/static/index.html @@ -0,0 +1,6 @@ + + +

hello word!!!

+

this is a html page

+ + \ No newline at end of file diff --git a/src/test/java/com/muyu/KafkaDemoApplicationTests.java b/src/test/java/com/muyu/KafkaDemoApplicationTests.java new file mode 100644 index 0000000..604b1e7 --- /dev/null +++ b/src/test/java/com/muyu/KafkaDemoApplicationTests.java @@ -0,0 +1,13 @@ +package com.muyu; + +import org.junit.jupiter.api.Test; +import org.springframework.boot.test.context.SpringBootTest; + +@SpringBootTest +class KafkaDemoApplicationTests { + + @Test + void contextLoads() { + } + +}