commit de61c36f72e0049b58fec326ec541900163cbb20 Author: DongZeLiang <2746733890@qq.com> Date: Fri Aug 25 19:09:17 2023 +0800 初始化 diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..5793576 --- /dev/null +++ b/.gitignore @@ -0,0 +1,39 @@ +target/ +!.mvn/wrapper/maven-wrapper.jar +!**/src/main/**/target/ +!**/src/test/**/target/ + +### IntelliJ IDEA ### +/.idea +.idea/modules.xml +.idea/jarRepositories.xml +.idea/compiler.xml +.idea/libraries/ +*.iws +*.iml +*.ipr + +### Eclipse ### +.apt_generated +.classpath +.factorypath +.project +.settings +.springBeans +.sts4-cache + +### NetBeans ### +/nbproject/private/ +/nbbuild/ +/dist/ +/nbdist/ +/.nb-gradle/ +build/ +!**/src/main/**/build/ +!**/src/test/**/build/ + +### VS Code ### +.vscode/ + +### Mac OS ### +.DS_Store diff --git a/compose/kafka/docker-compose.yml b/compose/kafka/docker-compose.yml new file mode 100644 index 0000000..15a990d --- /dev/null +++ b/compose/kafka/docker-compose.yml @@ -0,0 +1,102 @@ +version: "3.6" +services: + kafka1: + container_name: kafka1 + image: 'bitnami/kafka:3.3.1' + user: root + ports: + - '19092:9092' + - '19093:9093' + environment: + # ID + - KAFKA_BROKER_ID=1 + # 定义安全协议 + - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT + # 定义外网访问地址(宿主机ip地址和端口) + - KAFKA_CFG_LISTENERS=PLAINTEXT://kafka1:9092 + # 定义外网访问地址(宿主机ip地址和端口) + - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://39.100.65.135:19092 + # 允许使用PLAINTEXT侦听器 + - ALLOW_PLAINTEXT_LISTENER=yes + # 设置broker最大内存,和初始内存 + - KAFKA_HEAP_OPTS=-Xmx512M -Xms256M + # 设置东八区时区 + - TZ="Asia/Shanghai" + # 需要吧zook使用extra_hosts注入进来 + - KAFKA_ZOOKEEPER_CONNECT=10.103.100.1:2181,10.103.100.1:2182,10.103.100.1:2183 + # 三个节点保持一致 + - KAFKA_KRAFT_CLUSTER_ID=iZWRiSqjZAlYwlKEqHFQWI + volumes: + - /usr/local/docker/kafka/data/broker01:/bitnami/kafka:rw + networks: + - kafka-net + kafka2: + container_name: kafka2 + image: 'bitnami/kafka:3.3.1' + user: root + ports: + - '29092:9092' + - '29093:9093' + environment: + # ID + - KAFKA_BROKER_ID=2 + # 定义安全协议 + - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT + # 定义外网访问地址(宿主机ip地址和端口) + - KAFKA_CFG_LISTENERS=PLAINTEXT://kafka2:9092 + # 定义外网访问地址(宿主机ip地址和端口) + - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://39.100.65.135:29092 + # 允许使用PLAINTEXT侦听器 + - ALLOW_PLAINTEXT_LISTENER=yes + # 设置broker最大内存,和初始内存 + - KAFKA_HEAP_OPTS=-Xmx512M -Xms256M + # 设置东八区时区 + - TZ="Asia/Shanghai" + # 需要吧zook使用extra_hosts注入进来 + - KAFKA_ZOOKEEPER_CONNECT=10.103.100.1:2181,10.103.100.1:2182,10.103.100.1:2183 + # 三个节点保持一致 + - KAFKA_KRAFT_CLUSTER_ID=iZWRiSqjZAlYwlKEqHFQWI + volumes: + - /usr/local/docker/kafka/data/broker02:/bitnami/kafka:rw + networks: + - kafka-net + kafka3: + container_name: kafka3 + image: 'bitnami/kafka:3.3.1' + user: root + ports: + - '39092:9092' + - '39093:9093' + environment: + # ID + - KAFKA_BROKER_ID=3 + # 定义安全协议 + - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT + # 定义外网访问地址(宿主机ip地址和端口) + - KAFKA_CFG_LISTENERS=PLAINTEXT://kafka3:9092 + # 定义外网访问地址(宿主机ip地址和端口) + - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://39.100.65.135:39092 + # 允许使用PLAINTEXT侦听器 + - ALLOW_PLAINTEXT_LISTENER=yes + # 设置broker最大内存,和初始内存 + - KAFKA_HEAP_OPTS=-Xmx512M -Xms256M + # 设置东八区时区 + - TZ="Asia/Shanghai" + # 需要吧zook使用extra_hosts注入进来 + - KAFKA_ZOOKEEPER_CONNECT=10.103.100.1:2181,10.103.100.1:2182,10.103.100.1:2183 + # 三个节点保持一致 + - KAFKA_KRAFT_CLUSTER_ID=iZWRiSqjZAlYwlKEqHFQWI + volumes: + - /usr/local/docker/kafka/data/broker03:/bitnami/kafka:rw + networks: + - kafka-net +networks: + name: + kafka-net: + driver: bridge + name: kafka-net + ipam: + driver: default + config: + - subnet: 10.100.2.0/24 + gateway: 10.100.2.1 diff --git a/compose/know-streaming/docker-compose.yml b/compose/know-streaming/docker-compose.yml new file mode 100644 index 0000000..6794412 --- /dev/null +++ b/compose/know-streaming/docker-compose.yml @@ -0,0 +1,105 @@ +version: "3.6" +services: + knowstreaming-manager: + image: knowstreaming/knowstreaming-manager:latest + container_name: knowstreaming-manager + privileged: true + restart: always + depends_on: + - elasticsearch-single + - knowstreaming-mysql + expose: + - 80 + networks: + - knowStreaming-net + command: + - /bin/sh + - /ks-start.sh + environment: + TZ: Asia/Shanghai + # mysql服务地址 + SERVER_MYSQL_ADDRESS: knowstreaming-mysql:3306 + # mysql数据库名 + SERVER_MYSQL_DB: know_streaming + # mysql用户名 + SERVER_MYSQL_USER: root + # mysql用户密码 + SERVER_MYSQL_PASSWORD: root + # es服务地址 + SERVER_ES_ADDRESS: elasticsearch-single:9200 + # 服务JVM参数 + JAVA_OPTS: -Xmx2g -Xms1g + # 服务日志路径 + volumes: + - /usr/local/docker/know-streaming/data/manage/log:/logs + knowstreaming-ui: + image: knowstreaming/knowstreaming-ui:latest + container_name: knowstreaming-ui + restart: always + ports: + - '80:80' + networks: + - knowStreaming-net + environment: + TZ: Asia/Shanghai + depends_on: + - knowstreaming-manager + elasticsearch-single: + image: elasticsearch:7.6.2 + container_name: elasticsearch-single + restart: always + expose: + - 9200 + - 9300 + networks: + - knowStreaming-net + environment: + TZ: Asia/Shanghai + # es的JVM参数 + ES_JAVA_OPTS: -Xms512m -Xmx512m + # 单节点配置,多节点集群参考 https://www.elastic.co/guide/en/elasticsearch/reference/7.6/docker.html#docker-compose-file + discovery.type: single-node + # 数据持久化路径 + volumes: + - /usr/local/docker/know-streaming/data/es/data:/usr/share/elasticsearch/data + knowstreaming-init: + image: knowstreaming/knowstreaming-manager:latest + container_name: knowstreaming-init + depends_on: + - elasticsearch-single + command: + - /bin/bash + - /es_template_create.sh + environment: + TZ: Asia/Shanghai + # es服务地址 + SERVER_ES_ADDRESS: elasticsearch-single:9200 + knowstreaming-mysql: + image: knowstreaming/knowstreaming-mysql:latest + container_name: knowstreaming-mysql + restart: always + environment: + TZ: Asia/Shanghai + # root 用户密码 + MYSQL_ROOT_PASSWORD: root + # 初始化时创建的数据库名称 + MYSQL_DATABASE: know_streaming + # 通配所有host,可以访问远程 + MYSQL_ROOT_HOST: '%' + expose: + - 3306 + networks: + - knowStreaming-net + # 数据持久化路径 + volumes: + - /usr/local/docker/know-streaming/data/mysql/data:/data/mysql +networks: + name: + knowStreaming-net: + driver: bridge + name: knowStreaming-net + ipam: + driver: default + config: + - subnet: 10.100.3.0/24 + gateway: 10.100.3.1 diff --git a/compose/zookeeper/docker-compose.yml b/compose/zookeeper/docker-compose.yml new file mode 100644 index 0000000..ea145ca --- /dev/null +++ b/compose/zookeeper/docker-compose.yml @@ -0,0 +1,77 @@ +version: '3.6' + +services: + zook1: + image: zookeeper:latest + restart: always + hostname: zook1 + #容器名称,方便在rancher中显示有意义的名称 + container_name: zook1 + #将本容器的zookeeper默认端口号映射出去 + ports: + - 2181:2181 + # 挂载数据卷 + volumes: + - "/usr/local/docker/zookeeper/data/zook1/data:/data" + - "/usr/local/docker/zookeeper/data/zook1/datalog:/datalog" + - "/usr/local/docker/zookeeper/data/zook1/logs:/logs" + environment: + #即是zookeeper的节点值,也是kafka的brokerid值 + ZOO_MY_ID: 1 + ZOO_SERVERS: server.1=zook1:2888:3888;2181 server.2=zook2:2888:3888;2181 server.3=zook3:2888:3888;2181 + networks: + zookeeper-net: + ipv4_address: 10.100.1.2 + + zook2: + image: zookeeper:latest + restart: always + hostname: zook2 + #容器名称,方便在rancher中显示有意义的名称 + container_name: zook2 + #将本容器的zookeeper默认端口号映射出去 + ports: + - 2182:2181 + #将本容器的zookeeper默认端口号映射出去 + volumes: + - "/usr/local/docker/zookeeper/data/zook2/data:/data" + - "/usr/local/docker/zookeeper/data/zook2/datalog:/datalog" + - "/usr/local/docker/zookeeper/data/zook2/logs:/logs" + environment: + #即是zookeeper的节点值,也是kafka的brokerid值 + ZOO_MY_ID: 2 + ZOO_SERVERS: server.1=zook1:2888:3888;2181 server.2=zook2:2888:3888;2181 server.3=zook3:2888:3888;2181 + networks: + zookeeper-net: + ipv4_address: 10.100.1.3 + + zook3: + image: zookeeper:latest + restart: always + hostname: zook3 + #容器名称,方便在rancher中显示有意义的名称 + container_name: zook3 + #将本容器的zookeeper默认端口号映射出去 + ports: + - 2183:2181 + volumes: + - "/usr/local/docker/zookeeper/data/zook3/data:/data" + - "/usr/local/docker/zookeeper/data/zook3/datalog:/datalog" + - "/usr/local/docker/zookeeper/data/zook3/logs:/logs" + environment: + #即是zookeeper的节点值,也是kafka的brokerid值 + ZOO_MY_ID: 3 + ZOO_SERVERS: server.1=zook1:2888:3888;2181 server.2=zook2:2888:3888;2181 server.3=zook3:2888:3888;2181 + networks: + zookeeper-net: + ipv4_address: 10.100.1.4 +networks: + name: + zookeeper-net: + driver: bridge + name: zookeeper-net + ipam: + driver: default + config: + - subnet: 10.100.1.0/24 + gateway: 10.100.1.1 diff --git a/pom.xml b/pom.xml new file mode 100644 index 0000000..92361ec --- /dev/null +++ b/pom.xml @@ -0,0 +1,34 @@ + + + 4.0.0 + + com.muyu + kafka + 1.0-SNAPSHOT + + + 17 + 17 + UTF-8 + + + + org.springframework.boot + spring-boot-starter-parent + 2.7.7 + + + + + org.apache.kafka + kafka-clients + 3.3.1 + + + org.springframework.boot + spring-boot-starter-web + + + diff --git a/src/main/java/com/muyu/kafka/consumer/ConsumerTest.java b/src/main/java/com/muyu/kafka/consumer/ConsumerTest.java new file mode 100644 index 0000000..f24592e --- /dev/null +++ b/src/main/java/com/muyu/kafka/consumer/ConsumerTest.java @@ -0,0 +1,57 @@ +package com.muyu.kafka.consumer; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Properties; + +import static com.muyu.kafka.contents.KafkaContent.KAFKA_CON; +import static com.muyu.kafka.contents.KafkaContent.TOPIC; + +/** + * @author DongZl + * @description: 消费者测试类 + * @Date 2023/8/25 18:52 + */ +public class ConsumerTest { + private static KafkaConsumer consumer; + + private static void KfkConsumer() { + Properties props = new Properties(); + props.put("bootstrap.servers", KAFKA_CON); + props.put("group.id", "group01"); + props.put("enable.auto.commit", "true"); + props.put("auto.commit.interval.ms", "1000"); + props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); + props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); + consumer = new KafkaConsumer<>(props); + consumer.subscribe(List.of(TOPIC)); + } + + private static void close() { + consumer.close(); + } + + private static List> poll(int num) { + List> result = new ArrayList<>(); + while (result.size() < num) { + ConsumerRecords records = consumer.poll(Duration.ofMillis(100)); + for (ConsumerRecord record : records) { + result.add(Arrays.asList(record.offset(), record.key(), record.value())); + System.out.println(record.offset() +" - "+ record.key() +" - "+ record.value()); + } + } + return result; + } + + public static void main (String[] args) { + KfkConsumer(); + poll(100); + close(); + } +} diff --git a/src/main/java/com/muyu/kafka/contents/KafkaContent.java b/src/main/java/com/muyu/kafka/contents/KafkaContent.java new file mode 100644 index 0000000..485d25a --- /dev/null +++ b/src/main/java/com/muyu/kafka/contents/KafkaContent.java @@ -0,0 +1,13 @@ +package com.muyu.kafka.contents; + +/** + * @author DongZl + * @description: kafka常量类 + * @Date 2023/8/25 18:47 + */ +public class KafkaContent { + + public static final String TOPIC = "top"; + + public static final String KAFKA_CON = "39.100.65.135:39092,39.100.65.135:29092,39.100.65.135:19092"; +} diff --git a/src/main/java/com/muyu/kafka/producer/ProducerTest.java b/src/main/java/com/muyu/kafka/producer/ProducerTest.java new file mode 100644 index 0000000..f9c0303 --- /dev/null +++ b/src/main/java/com/muyu/kafka/producer/ProducerTest.java @@ -0,0 +1,68 @@ +package com.muyu.kafka.producer; + +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.clients.producer.RecordMetadata; + +import java.time.LocalDateTime; +import java.time.format.DateTimeFormatter; +import java.util.Properties; +import java.util.UUID; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; + +import static com.muyu.kafka.contents.KafkaContent.KAFKA_CON; +import static com.muyu.kafka.contents.KafkaContent.TOPIC; + +/** + * @author DongZl + * @description: 生产者测试 + * @Date 2023/8/25 18:50 + */ +public class ProducerTest { + private static Producer producer; + + public static void KfkProducer() { + Properties props = new Properties(); + props.put("bootstrap.servers", KAFKA_CON); +// props.put("linger.ms", 1); + props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); + props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); + producer = new KafkaProducer<>(props); + } + + private static void close() { + producer.close(); + } + + private static RecordMetadata send(String topic, String key, String value) { + Future result = producer.send(new ProducerRecord<>(topic, key, value)); + RecordMetadata meta = null; + try { + meta = result.get(); + } catch (InterruptedException | ExecutionException e) { + e.printStackTrace(); + } + return meta; + } + + public static void main (String[] args) { + KfkProducer(); + + new Thread(() -> { + int i = 0; + do { + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + String format = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")); + send(TOPIC, UUID.randomUUID().toString(), format); + }while (i++ < 1000); + close(); + }).start(); + + } +}