初始化

master
DongZeLiang 2023-08-25 19:09:17 +08:00
commit de61c36f72
8 changed files with 495 additions and 0 deletions

39
.gitignore vendored 100644
View File

@ -0,0 +1,39 @@
target/
!.mvn/wrapper/maven-wrapper.jar
!**/src/main/**/target/
!**/src/test/**/target/
### IntelliJ IDEA ###
/.idea
.idea/modules.xml
.idea/jarRepositories.xml
.idea/compiler.xml
.idea/libraries/
*.iws
*.iml
*.ipr
### Eclipse ###
.apt_generated
.classpath
.factorypath
.project
.settings
.springBeans
.sts4-cache
### NetBeans ###
/nbproject/private/
/nbbuild/
/dist/
/nbdist/
/.nb-gradle/
build/
!**/src/main/**/build/
!**/src/test/**/build/
### VS Code ###
.vscode/
### Mac OS ###
.DS_Store

View File

@ -0,0 +1,102 @@
version: "3.6"
services:
kafka1:
container_name: kafka1
image: 'bitnami/kafka:3.3.1'
user: root
ports:
- '19092:9092'
- '19093:9093'
environment:
# ID
- KAFKA_BROKER_ID=1
# 定义安全协议
- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT
# 定义外网访问地址宿主机ip地址和端口
- KAFKA_CFG_LISTENERS=PLAINTEXT://kafka1:9092
# 定义外网访问地址宿主机ip地址和端口
- KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://39.100.65.135:19092
# 允许使用PLAINTEXT侦听器
- ALLOW_PLAINTEXT_LISTENER=yes
# 设置broker最大内存和初始内存
- KAFKA_HEAP_OPTS=-Xmx512M -Xms256M
# 设置东八区时区
- TZ="Asia/Shanghai"
# 需要吧zook使用extra_hosts注入进来
- KAFKA_ZOOKEEPER_CONNECT=10.103.100.1:2181,10.103.100.1:2182,10.103.100.1:2183
# 三个节点保持一致
- KAFKA_KRAFT_CLUSTER_ID=iZWRiSqjZAlYwlKEqHFQWI
volumes:
- /usr/local/docker/kafka/data/broker01:/bitnami/kafka:rw
networks:
- kafka-net
kafka2:
container_name: kafka2
image: 'bitnami/kafka:3.3.1'
user: root
ports:
- '29092:9092'
- '29093:9093'
environment:
# ID
- KAFKA_BROKER_ID=2
# 定义安全协议
- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT
# 定义外网访问地址宿主机ip地址和端口
- KAFKA_CFG_LISTENERS=PLAINTEXT://kafka2:9092
# 定义外网访问地址宿主机ip地址和端口
- KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://39.100.65.135:29092
# 允许使用PLAINTEXT侦听器
- ALLOW_PLAINTEXT_LISTENER=yes
# 设置broker最大内存和初始内存
- KAFKA_HEAP_OPTS=-Xmx512M -Xms256M
# 设置东八区时区
- TZ="Asia/Shanghai"
# 需要吧zook使用extra_hosts注入进来
- KAFKA_ZOOKEEPER_CONNECT=10.103.100.1:2181,10.103.100.1:2182,10.103.100.1:2183
# 三个节点保持一致
- KAFKA_KRAFT_CLUSTER_ID=iZWRiSqjZAlYwlKEqHFQWI
volumes:
- /usr/local/docker/kafka/data/broker02:/bitnami/kafka:rw
networks:
- kafka-net
kafka3:
container_name: kafka3
image: 'bitnami/kafka:3.3.1'
user: root
ports:
- '39092:9092'
- '39093:9093'
environment:
# ID
- KAFKA_BROKER_ID=3
# 定义安全协议
- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT
# 定义外网访问地址宿主机ip地址和端口
- KAFKA_CFG_LISTENERS=PLAINTEXT://kafka3:9092
# 定义外网访问地址宿主机ip地址和端口
- KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://39.100.65.135:39092
# 允许使用PLAINTEXT侦听器
- ALLOW_PLAINTEXT_LISTENER=yes
# 设置broker最大内存和初始内存
- KAFKA_HEAP_OPTS=-Xmx512M -Xms256M
# 设置东八区时区
- TZ="Asia/Shanghai"
# 需要吧zook使用extra_hosts注入进来
- KAFKA_ZOOKEEPER_CONNECT=10.103.100.1:2181,10.103.100.1:2182,10.103.100.1:2183
# 三个节点保持一致
- KAFKA_KRAFT_CLUSTER_ID=iZWRiSqjZAlYwlKEqHFQWI
volumes:
- /usr/local/docker/kafka/data/broker03:/bitnami/kafka:rw
networks:
- kafka-net
networks:
name:
kafka-net:
driver: bridge
name: kafka-net
ipam:
driver: default
config:
- subnet: 10.100.2.0/24
gateway: 10.100.2.1

View File

@ -0,0 +1,105 @@
version: "3.6"
services:
knowstreaming-manager:
image: knowstreaming/knowstreaming-manager:latest
container_name: knowstreaming-manager
privileged: true
restart: always
depends_on:
- elasticsearch-single
- knowstreaming-mysql
expose:
- 80
networks:
- knowStreaming-net
command:
- /bin/sh
- /ks-start.sh
environment:
TZ: Asia/Shanghai
# mysql服务地址
SERVER_MYSQL_ADDRESS: knowstreaming-mysql:3306
# mysql数据库名
SERVER_MYSQL_DB: know_streaming
# mysql用户名
SERVER_MYSQL_USER: root
# mysql用户密码
SERVER_MYSQL_PASSWORD: root
# es服务地址
SERVER_ES_ADDRESS: elasticsearch-single:9200
# 服务JVM参数
JAVA_OPTS: -Xmx2g -Xms1g
# 服务日志路径
volumes:
- /usr/local/docker/know-streaming/data/manage/log:/logs
knowstreaming-ui:
image: knowstreaming/knowstreaming-ui:latest
container_name: knowstreaming-ui
restart: always
ports:
- '80:80'
networks:
- knowStreaming-net
environment:
TZ: Asia/Shanghai
depends_on:
- knowstreaming-manager
elasticsearch-single:
image: elasticsearch:7.6.2
container_name: elasticsearch-single
restart: always
expose:
- 9200
- 9300
networks:
- knowStreaming-net
environment:
TZ: Asia/Shanghai
# es的JVM参数
ES_JAVA_OPTS: -Xms512m -Xmx512m
# 单节点配置,多节点集群参考 https://www.elastic.co/guide/en/elasticsearch/reference/7.6/docker.html#docker-compose-file
discovery.type: single-node
# 数据持久化路径
volumes:
- /usr/local/docker/know-streaming/data/es/data:/usr/share/elasticsearch/data
knowstreaming-init:
image: knowstreaming/knowstreaming-manager:latest
container_name: knowstreaming-init
depends_on:
- elasticsearch-single
command:
- /bin/bash
- /es_template_create.sh
environment:
TZ: Asia/Shanghai
# es服务地址
SERVER_ES_ADDRESS: elasticsearch-single:9200
knowstreaming-mysql:
image: knowstreaming/knowstreaming-mysql:latest
container_name: knowstreaming-mysql
restart: always
environment:
TZ: Asia/Shanghai
# root 用户密码
MYSQL_ROOT_PASSWORD: root
# 初始化时创建的数据库名称
MYSQL_DATABASE: know_streaming
# 通配所有host,可以访问远程
MYSQL_ROOT_HOST: '%'
expose:
- 3306
networks:
- knowStreaming-net
# 数据持久化路径
volumes:
- /usr/local/docker/know-streaming/data/mysql/data:/data/mysql
networks:
name:
knowStreaming-net:
driver: bridge
name: knowStreaming-net
ipam:
driver: default
config:
- subnet: 10.100.3.0/24
gateway: 10.100.3.1

View File

@ -0,0 +1,77 @@
version: '3.6'
services:
zook1:
image: zookeeper:latest
restart: always
hostname: zook1
#容器名称方便在rancher中显示有意义的名称
container_name: zook1
#将本容器的zookeeper默认端口号映射出去
ports:
- 2181:2181
# 挂载数据卷
volumes:
- "/usr/local/docker/zookeeper/data/zook1/data:/data"
- "/usr/local/docker/zookeeper/data/zook1/datalog:/datalog"
- "/usr/local/docker/zookeeper/data/zook1/logs:/logs"
environment:
#即是zookeeper的节点值也是kafka的brokerid值
ZOO_MY_ID: 1
ZOO_SERVERS: server.1=zook1:2888:3888;2181 server.2=zook2:2888:3888;2181 server.3=zook3:2888:3888;2181
networks:
zookeeper-net:
ipv4_address: 10.100.1.2
zook2:
image: zookeeper:latest
restart: always
hostname: zook2
#容器名称方便在rancher中显示有意义的名称
container_name: zook2
#将本容器的zookeeper默认端口号映射出去
ports:
- 2182:2181
#将本容器的zookeeper默认端口号映射出去
volumes:
- "/usr/local/docker/zookeeper/data/zook2/data:/data"
- "/usr/local/docker/zookeeper/data/zook2/datalog:/datalog"
- "/usr/local/docker/zookeeper/data/zook2/logs:/logs"
environment:
#即是zookeeper的节点值也是kafka的brokerid值
ZOO_MY_ID: 2
ZOO_SERVERS: server.1=zook1:2888:3888;2181 server.2=zook2:2888:3888;2181 server.3=zook3:2888:3888;2181
networks:
zookeeper-net:
ipv4_address: 10.100.1.3
zook3:
image: zookeeper:latest
restart: always
hostname: zook3
#容器名称方便在rancher中显示有意义的名称
container_name: zook3
#将本容器的zookeeper默认端口号映射出去
ports:
- 2183:2181
volumes:
- "/usr/local/docker/zookeeper/data/zook3/data:/data"
- "/usr/local/docker/zookeeper/data/zook3/datalog:/datalog"
- "/usr/local/docker/zookeeper/data/zook3/logs:/logs"
environment:
#即是zookeeper的节点值也是kafka的brokerid值
ZOO_MY_ID: 3
ZOO_SERVERS: server.1=zook1:2888:3888;2181 server.2=zook2:2888:3888;2181 server.3=zook3:2888:3888;2181
networks:
zookeeper-net:
ipv4_address: 10.100.1.4
networks:
name:
zookeeper-net:
driver: bridge
name: zookeeper-net
ipam:
driver: default
config:
- subnet: 10.100.1.0/24
gateway: 10.100.1.1

34
pom.xml 100644
View File

@ -0,0 +1,34 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.muyu</groupId>
<artifactId>kafka</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<maven.compiler.source>17</maven.compiler.source>
<maven.compiler.target>17</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.7.7</version>
</parent>
<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.3.1</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,57 @@
package com.muyu.kafka.consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;
import static com.muyu.kafka.contents.KafkaContent.KAFKA_CON;
import static com.muyu.kafka.contents.KafkaContent.TOPIC;
/**
* @author DongZl
* @description:
* @Date 2023/8/25 18:52
*/
public class ConsumerTest {
private static KafkaConsumer<String, String> consumer;
private static void KfkConsumer() {
Properties props = new Properties();
props.put("bootstrap.servers", KAFKA_CON);
props.put("group.id", "group01");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
consumer = new KafkaConsumer<>(props);
consumer.subscribe(List.of(TOPIC));
}
private static void close() {
consumer.close();
}
private static List<List<Object>> poll(int num) {
List<List<Object>> result = new ArrayList<>();
while (result.size() < num) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
result.add(Arrays.asList(record.offset(), record.key(), record.value()));
System.out.println(record.offset() +" - "+ record.key() +" - "+ record.value());
}
}
return result;
}
public static void main (String[] args) {
KfkConsumer();
poll(100);
close();
}
}

View File

@ -0,0 +1,13 @@
package com.muyu.kafka.contents;
/**
* @author DongZl
* @description: kafka
* @Date 2023/8/25 18:47
*/
public class KafkaContent {
public static final String TOPIC = "top";
public static final String KAFKA_CON = "39.100.65.135:39092,39.100.65.135:29092,39.100.65.135:19092";
}

View File

@ -0,0 +1,68 @@
package com.muyu.kafka.producer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import static com.muyu.kafka.contents.KafkaContent.KAFKA_CON;
import static com.muyu.kafka.contents.KafkaContent.TOPIC;
/**
* @author DongZl
* @description:
* @Date 2023/8/25 18:50
*/
public class ProducerTest {
private static Producer<String, String> producer;
public static void KfkProducer() {
Properties props = new Properties();
props.put("bootstrap.servers", KAFKA_CON);
// props.put("linger.ms", 1);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
producer = new KafkaProducer<>(props);
}
private static void close() {
producer.close();
}
private static RecordMetadata send(String topic, String key, String value) {
Future<RecordMetadata> result = producer.send(new ProducerRecord<>(topic, key, value));
RecordMetadata meta = null;
try {
meta = result.get();
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
return meta;
}
public static void main (String[] args) {
KfkProducer();
new Thread(() -> {
int i = 0;
do {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
String format = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
send(TOPIC, UUID.randomUUID().toString(), format);
}while (i++ < 1000);
close();
}).start();
}
}