Kubernetes 部署 Kafka & Zookeeper & Kafka Manager
文章目录
!版权声明:本博客内容均为原创,每篇博文作为知识积累,写博不易,转载请注明出处。
系统环境:
- Kubernetes 版本:1.14.0
- kafka 版本:2.3.0
- zookeeper 版本:3.4.14
- kafka manager 版本:1.3.3
- 示例部署文件 Github 地址:https://github.com/my-dlq/blog-example/tree/master/kubernetes/kafka-zookeeper-deploy
一、简介
Kafka 简介
Kafka 是由 Apache 软件基金会开发的一个开源流处理平台,由 Scala 和 Java 编写。它是一个分布式、支持分区的、多副本,基于 zookeeper 协调的分布式消息系统。它最大特性是可以实时的处理大量数据以满足各种需求场景,比如基于 Hadoop 的批处理系统、低延时的实时系统、storm/Spark 流式处理引擎,web/nginx 日志,访问日志,消息服务等等。
Zookeeper 简介
ZooKeeper 是一种分布式协调服务,用于管理大型主机。在分布式环境中协调和管理服务是一个复杂的过程。ZooKeeper 通过其简单的架构和 API 解决了这个问题。 ZooKeeper 允许开发人员专注于核心应用程序逻辑,而不必担心应用程序的分布式特性。
Kafka 中主要利用 zookeeper 解决分布式一致性问题。Kafka 使用 Zookeeper 的分布式协调服务将生产者,消费者,消息储存结合在一起。同时借助 Zookeeper,Kafka 能够将生产者、消费者和 Broker 在内的所有组件在无状态的条件下建立起生产者和消费者的订阅关系,实现生产者的负载均衡。
Kafka Manager 简介
Kafka Manager 是目前最受欢迎的 Kafka 集群管理工具,最早由雅虎开源,用户可以在 Web 界面执行一些简单的集群管理操作。
支持以下功能:
- 管理 Kafka 集群
- 方便集群状态监控 (包括topics, consumers, offsets, brokers, replica distribution, partition distribution)
- 方便选择分区副本
- 配置分区任务,包括选择使用哪些 Brokers
- 可以对分区任务重分配
- 提供不同的选项来创建及删除 Topic
- Topic List 会指明哪些topic被删除
- 批量产生分区任务并且和多个topic和brokers关联
- 批量运行多个主题对应的多个分区
- 向已经存在的主题中添加分区
- 对已经存在的 Topic 修改配置
- 可以在 Broker Level 和 Topic Level 的度量中启用 JMX Polling 功能
- 可以过滤在 ZooKeeper 上没有 ids/owners/offsets/directories 的 consumer
二、部署过程
这个流程需要部署三个组件,分别为 Zookeeper、Kafka、Kafka Manager:
- (1)、Zookeeper: 首先部署 Zookeeper,方便后续部署 Kafka 节点注册到 Zookeeper,用 StatefulSet 方式部署三个节点。
- (2)、Kafka: 第二个部署的是 Kafka,设置环境变量来指定 Zookeeper 地址,用 StatefulSet 方式部署。
- (3)、Kafka Manager: 最后部署的是 Kafka Manager,用 Deployment 方式部署,然后打开 Web UI 界面来管理、监控 Kafka。
三、Kubernetes 部署 Zookeeper & Kafka & Kafka Manager
1、创建 StorageClass
由于都是使用 StatefulSet 方式部署的有状态服务,所以 Kubernetes 集群需要提前设置一个 StorageClass 方便后续部署时指定存储分配(如果想指定为已经存在的 StorageClass 创建 PV 则跳过此步骤)。
此处用的是 NFS 存储驱动,如果是其它存储需要提前设置好相关配置
创建 StorageClass 部署文件
nfs-storage.yaml
1apiVersion: storage.k8s.io/v1
2kind: StorageClass
3metadata:
4 name: nfs-storage
5provisioner: nfs-client #动态卷分配服务指定的名称
6parameters:
7 archiveOnDelete: "true" #设置为"false"时删除PVC不会保留数据,"true"则保留数据
8mountOptions:
9 - hard #指定为硬挂载方式
10 - nfsvers=4 #指定NFS版本
部署 StorageClass
1$ kubectl apply -f nfs-storage.yaml
2、Kubernetes 部署 Zookeeper
创建 Zookeeper 部署文件
zookeeper.yaml
1#部署 Service Headless,用于Zookeeper间相互通信
2apiVersion: v1
3kind: Service
4metadata:
5 name: zookeeper-headless
6 labels:
7 app: zookeeper
8spec:
9 type: ClusterIP
10 clusterIP: None
11 publishNotReadyAddresses: true
12 ports:
13 - name: client
14 port: 2181
15 targetPort: client
16 - name: follower
17 port: 2888
18 targetPort: follower
19 - name: election
20 port: 3888
21 targetPort: election
22 selector:
23 app: zookeeper
24---
25#部署 Service,用于外部访问 Zookeeper
26apiVersion: v1
27kind: Service
28metadata:
29 name: zookeeper
30 labels:
31 app: zookeeper
32spec:
33 type: ClusterIP
34 ports:
35 - name: client
36 port: 2181
37 targetPort: client
38 - name: follower
39 port: 2888
40 targetPort: follower
41 - name: election
42 port: 3888
43 targetPort: election
44 selector:
45 app: zookeeper
46---
47apiVersion: apps/v1
48kind: StatefulSet
49metadata:
50 name: zookeeper
51 labels:
52 app: zookeeper
53spec:
54 serviceName: zookeeper-headless
55 replicas: 3
56 podManagementPolicy: Parallel
57 updateStrategy:
58 type: RollingUpdate
59 selector:
60 matchLabels:
61 app: zookeeper
62 template:
63 metadata:
64 name: zookeeper
65 labels:
66 app: zookeeper
67 spec:
68 securityContext:
69 fsGroup: 1001
70 containers:
71 - name: zookeeper
72 image: docker.io/bitnami/zookeeper:3.4.14-debian-9-r25
73 imagePullPolicy: IfNotPresent
74 securityContext:
75 runAsUser: 1001
76 command:
77 - bash
78 - -ec
79 - |
80 # Execute entrypoint as usual after obtaining ZOO_SERVER_ID based on POD hostname
81 HOSTNAME=`hostname -s`
82 if [[ $HOSTNAME =~ (.*)-([0-9]+)$ ]]; then
83 ORD=${BASH_REMATCH[2]}
84 export ZOO_SERVER_ID=$((ORD+1))
85 else
86 echo "Failed to get index from hostname $HOST"
87 exit 1
88 fi
89 . /opt/bitnami/base/functions
90 . /opt/bitnami/base/helpers
91 print_welcome_page
92 . /init.sh
93 nami_initialize zookeeper
94 exec tini -- /run.sh
95 resources:
96 limits:
97 cpu: 500m
98 memory: 512Mi
99 requests:
100 cpu: 250m
101 memory: 256Mi
102 env:
103 - name: ZOO_PORT_NUMBER
104 value: "2181"
105 - name: ZOO_TICK_TIME
106 value: "2000"
107 - name: ZOO_INIT_LIMIT
108 value: "10"
109 - name: ZOO_SYNC_LIMIT
110 value: "5"
111 - name: ZOO_MAX_CLIENT_CNXNS
112 value: "60"
113 - name: ZOO_SERVERS
114 value: "
115 zookeeper-0.zookeeper-headless:2888:3888,
116 zookeeper-1.zookeeper-headless:2888:3888,
117 zookeeper-2.zookeeper-headless:2888:3888
118 "
119 - name: ZOO_ENABLE_AUTH
120 value: "no"
121 - name: ZOO_HEAP_SIZE
122 value: "1024"
123 - name: ZOO_LOG_LEVEL
124 value: "ERROR"
125 - name: ALLOW_ANONYMOUS_LOGIN
126 value: "yes"
127 ports:
128 - name: client
129 containerPort: 2181
130 - name: follower
131 containerPort: 2888
132 - name: election
133 containerPort: 3888
134 livenessProbe:
135 tcpSocket:
136 port: client
137 initialDelaySeconds: 30
138 periodSeconds: 10
139 timeoutSeconds: 5
140 successThreshold: 1
141 failureThreshold: 6
142 readinessProbe:
143 tcpSocket:
144 port: client
145 initialDelaySeconds: 5
146 periodSeconds: 10
147 timeoutSeconds: 5
148 successThreshold: 1
149 failureThreshold: 6
150 volumeMounts:
151 - name: data
152 mountPath: /bitnami/zookeeper
153 volumeClaimTemplates:
154 - metadata:
155 name: data
156 annotations:
157 spec:
158 storageClassName: nfs-storage #指定为上面创建的 storageclass
159 accessModes:
160 - ReadWriteOnce
161 resources:
162 requests:
163 storage: 5Gi
部署 Zookeeper
- -n:指定应用启动的 Namespace,替换自己集群的 Namespace
1$ kubectl apply -f zookeeper.yaml -n mydlqcloud
3、Kubernetes 部署 Kafka
创建 Kafka 部署文件
kafka.yaml
1#部署 Service Headless,用于Kafka间相互通信
2apiVersion: v1
3kind: Service
4metadata:
5 name: kafka-headless
6 labels:
7 app: kafka
8spec:
9 type: ClusterIP
10 clusterIP: None
11 ports:
12 - name: kafka
13 port: 9092
14 targetPort: kafka
15 selector:
16 app: kafka
17---
18#部署 Service,用于外部访问 Kafka
19apiVersion: v1
20kind: Service
21metadata:
22 name: kafka
23 labels:
24 app: kafka
25spec:
26 type: ClusterIP
27 ports:
28 - name: kafka
29 port: 9092
30 targetPort: kafka
31 selector:
32 app: kafka
33---
34apiVersion: apps/v1
35kind: StatefulSet
36metadata:
37 name: "kafka"
38 labels:
39 app: kafka
40spec:
41 selector:
42 matchLabels:
43 app: kafka
44 serviceName: kafka-headless
45 podManagementPolicy: "Parallel"
46 replicas: 3
47 updateStrategy:
48 type: "RollingUpdate"
49 template:
50 metadata:
51 name: "kafka"
52 labels:
53 app: kafka
54 spec:
55 securityContext:
56 fsGroup: 1001
57 runAsUser: 1001
58 containers:
59 - name: kafka
60 image: "docker.io/bitnami/kafka:2.3.0-debian-9-r4"
61 imagePullPolicy: "IfNotPresent"
62 resources:
63 limits:
64 cpu: 500m
65 memory: 512Mi
66 requests:
67 cpu: 250m
68 memory: 256Mi
69 env:
70 - name: MY_POD_IP
71 valueFrom:
72 fieldRef:
73 fieldPath: status.podIP
74 - name: MY_POD_NAME
75 valueFrom:
76 fieldRef:
77 fieldPath: metadata.name
78 - name: KAFKA_CFG_ZOOKEEPER_CONNECT
79 value: "zookeeper" #Zookeeper Service 名称
80 - name: KAFKA_PORT_NUMBER
81 value: "9092"
82 - name: KAFKA_CFG_LISTENERS
83 value: "PLAINTEXT://:$(KAFKA_PORT_NUMBER)"
84 - name: KAFKA_CFG_ADVERTISED_LISTENERS
85 value: 'PLAINTEXT://$(MY_POD_NAME).kafka-headless:$(KAFKA_PORT_NUMBER)'
86 - name: ALLOW_PLAINTEXT_LISTENER
87 value: "yes"
88 - name: KAFKA_HEAP_OPTS
89 value: "-Xmx512m -Xms512m"
90 - name: KAFKA_CFG_LOGS_DIRS
91 value: /opt/bitnami/kafka/data
92 - name: JMX_PORT
93 value: "9988"
94 ports:
95 - name: kafka
96 containerPort: 9092
97 livenessProbe:
98 tcpSocket:
99 port: kafka
100 initialDelaySeconds: 10
101 periodSeconds: 10
102 timeoutSeconds: 5
103 successThreshold: 1
104 failureThreshold: 2
105 readinessProbe:
106 tcpSocket:
107 port: kafka
108 initialDelaySeconds: 5
109 periodSeconds: 10
110 timeoutSeconds: 5
111 successThreshold: 1
112 failureThreshold: 6
113 volumeMounts:
114 - name: data
115 mountPath: /bitnami/kafka
116 volumeClaimTemplates:
117 - metadata:
118 name: data
119 spec:
120 storageClassName: nfs-storage #指定为上面创建的 storageclass
121 accessModes:
122 - "ReadWriteOnce"
123 resources:
124 requests:
125 storage: 5Gi
部署 Kafka
- -n:指定应用启动的 Namespace,替换自己集群的 Namespace
1$ kubectl apply -f kafka.yaml -n mydlqcloud
4、Kubernetes 部署 Kafka Manager
创建 Kafka Manager 部署文件
kafka-manager.yaml
1apiVersion: v1
2kind: Service
3metadata:
4 name: kafka-manager
5 labels:
6 app: kafka-manager
7spec:
8 type: NodePort
9 ports:
10 - name: kafka
11 port: 9000
12 targetPort: 9000
13 nodePort: 30900
14 selector:
15 app: kafka-manager
16---
17apiVersion: apps/v1
18kind: Deployment
19metadata:
20 name: kafka-manager
21 labels:
22 app: kafka-manager
23spec:
24 replicas: 1
25 selector:
26 matchLabels:
27 app: kafka-manager
28 template:
29 metadata:
30 labels:
31 app: kafka-manager
32 spec:
33 containers:
34 - name: kafka-manager
35 image: zenko/kafka-manager:1.3.3.22
36 imagePullPolicy: IfNotPresent
37 ports:
38 - name: kafka-manager
39 containerPort: 9000
40 protocol: TCP
41 env:
42 - name: ZK_HOSTS
43 value: "zookeeper:2181"
44 livenessProbe:
45 httpGet:
46 path: /api/health
47 port: kafka-manager
48 readinessProbe:
49 httpGet:
50 path: /api/health
51 port: kafka-manager
52 resources:
53 limits:
54 cpu: 500m
55 memory: 512Mi
56 requests:
57 cpu: 250m
58 memory: 256Mi
部署 Kafka Manager
- -n:指定应用启动的 Namespace,替换自己集群的 Namespace
1$ kubectl apply -f kafka-manager.yaml -n mydlqcloud
四、进入 Kafka Manager 管理 Kafka 集群
这里的 Kubernetes 集群地址为:192.168.2.11,并且在上面设置 Kafka-Manager 网络策略为 NodePort 方式,且设置端口为 30900,这里输入地址:http://192.168.2.11:30900 访问 Kafka Manager。
进入后先配置 Kafka Manager,增加一个 Zookeeper 地址。
配置三个必填参数:
- Cluster Name:自定义一个名称,任意输入即可。
- Zookeeper Hosts:输入 Zookeeper 地址,这里设置为 Zookeeper 服务名+端口。
- Kafka Version:选择 kafka 版本。
配置完成后就可以看到新增了一条记录,点进去就可以查看相关集群信息。
五、附录:镜像参数配置
Zookeeper 镜像可配置变量参数
- 镜像 github 地址:https://github.com/bitnami/bitnami-docker-zookeeper
参数名称 | 描述 |
---|---|
ZOO_PORT_NUMBER | Zookeeper客户端端口。默认值:2181 |
ZOO_SERVER_ID | 集合中服务器的ID。默认值:1 |
ZOO_TICK_TIME | ZooKeeper用于心跳的基本时间单位(以毫秒为单位)。默认值:2000 |
ZOO_INIT_LIMIT | ZooKeeper用于限制仲裁中ZooKeeper服务器连接到领导者的时间长度。默认值:10 |
ZOO_SYNC_LIMIT | 服务器与领导者的过时距离。默认值:5 |
ZOO_MAX_CLIENT_CNXNS | 限制单个客户端可能对ZooKeeper集合的单个成员进行的并发连接数。默认60 |
ZOO_SERVERS | 逗号,空格或冒号分隔的服务器列表。示例:zoo1:2888:3888,zoo2:2888:3888。没有默认值。 |
ZOO_CLIENT_USER | 将使用Zookeeper客户端进行身份验证的用户。默认值:无默认值。 |
ZOO_CLIENT_PASSWORD | 将使用Zookeeper客户端进行身份验证的密码。没有默认值。 |
ZOO_SERVER_USERS | 逗号,分号或空格分隔的要创建的用户列表。示例:user1,user2,admin。没有默认值 |
ZOO_SERVER_PASSWORDS | 逗号,半精或空格分隔的密码列表,在创建时分配给用户。示例:pass4user1,pass4user2,pass4admin。没有默认值 |
ZOO_ENABLE_AUTH | 启用Zookeeper身份验证。它使用SASL / Digest-MD5。默认值:否 |
ZOO_RECONFIG_ENABLED | 启用ZooKeeper动态重配置。默认值:否 |
ZOO_HEAP_SIZE | Java堆选项(Xmx和XM)的大小(MB)。如果通过Xmx配置Xm,则忽略此env var JVMFLAGS。默认值:1024 |
ZOO_LOG_LEVEL | Zookeeper日志级别。可用级别为:ALL,DEBUG,INFO,WARN,ERROR,FATAL,OFF,TRACE。默认值:INFO |
ALLOW_ANONYMOUS_LOGIN | 如果设置为true,则允许接受来自未经身份验证的用户的连接。默认值:否 |
JVMFLAGS | ZooKeeper进程的默认JVMFLAGS。没有默认值 |
Kafka 镜像可配置变量参数
- 镜像 github 地址:https://github.com/bitnami/bitnami-docker-kafka
参数名称 | 描述 |
---|---|
KAFKA_CFG_ZOOKEEPER_CONNECT | Zookeeper集群地址,例如"zookeeper:2181" |
KAFKA_PORT_NUMBER | Kafka端口,例如"9092" |
ALLOW_PLAINTEXT_LISTENER | 是否启用Plaintext侦听器,默认"false" |
KAFKA_CFG_LISTENERS | Kafka 监听列表,broker对外提供服务时绑定的IP和端口 |
KAFKA_CFG_ADVERTISED_LISTENERS | 给客户端用的发布至zookeeper的监听,broker 会上送此地址到zookeeper,zookeeper会将此地址提供给消费者,消费者根据此地址获取消息。 |
KAFKA_HEAP_OPTS | Java JVM堆内存大小配置,例如"-Xmx512m -Xms512m" |
KAFKA_CFG_LOGS_DIRS | Kafka 日志存储目录 |
JMX_PORT | JMX端口配置,设置此参数才能开启JMX,例如设置为"9988" |
KAFKA_CFG_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM | 是否启用基于主机名的认证认证,例如想开启可以设置为"https" |
KAFKA_CFG_BROKER_ID | Broker ID值,每个节点值都唯一。默认为"-1",自动生成BrokerId |
KAFKA_CFG_DELETE_TOPIC_ENABLE | 是否允许删除Topic,默认"false" |
KAFKA_CFG_LOG_FLUSH_INTERVAL_MESSAGES | 此项配置指定时间间隔,强制进行fsync日志,默认"10000" |
KAFKA_CFG_LOG_FLUSH_INTERVAL_MS | 此项配置用来置顶强制进行fsync日志到磁盘的时间间隔"1000" |
KAFKA_CFG_LOG_RETENTION_BYTES | 每个Topic下每个Partition保存数据的总量,超过限制都会删除一个段文件,默认"1073741824" |
KAFKA_CFG_LOG_RETENTION_CHECK_INTERVALS_MS | 检查日志分段文件的间隔时间,以确定是否文件属性是否到达删除要求,默认"300000" |
KAFKA_CFG_LOG_RETENTION_HOURS | 每个日志文件删除之前保存的时间,默认数据保存时间对所有topic都一样,默认"168" |
KAFKA_CFG_LOG_MESSAGE_FORMAT_VERSION | 指定broker将用于将消息添加到日志文件的消息格式版本 |
KAFKA_CFG_MAX_MESSAGE_BYTES | Kafka允许的最大记录批大小。默认"1000000" |
KAFKA_CFG_SEGMENT_BYTES | 日志段文件的最大大小。当达到这个大小时,将创建一个新的日志段。默认"1073741824" |
KAFKA_CFG_DEFAULT_REPLICATION_FACTOR | 对replica的数目进行配置,默认值为"1",表示不对topic进行备份。如果配置为2,表示除了leader节点,对于topic里的每一个partition,都会有一个额外的备份。 |
KAFKA_CFG_OFFSETS_TOPIC_REPLICATION_FACTOR | Topic的offset的备份份数。建议设置更高的数字保证更高的可用性 |
KAFKA_CFG_TRANSACTION_STATE_LOG_REPLICATION_FACTOR | 事务Topic的复制因子(设置得更高以确保可用性) |
KAFKA_CFG_TRANSACTION_STATE_LOG_MIN_ISR | 事务Topic的副本数 |
KAFKA_CFG_NUM_IO_THREADS | 用来处理请求的I/O线程的数目 |
KAFKA_CFG_NUM_NETWORK_THREADS | 用来处理网络请求的网络线程数目 |
KAFKA_CFG_NUM_PARTITIONS | 每个主题的日志分区的默认数量 |
KAFKA_CFG_NUM_RECOVERY_THREADS_PER_DATA_DIR | 每个数据目录中的线程数,用于在启动时日志恢复,并在关闭时刷新。默认"1" |
KAFKA_CFG_SOCKET_RECEIVE_BUFFER_BYTES | SO_RCVBUFF缓存大小,server进行socket连接时所用 |
KAFKA_CFG_SOCKET_REQUEST_MAX_BYTES | 允许的最大请求尺寸,这将避免server溢出,它应该小于Java heap size,默认"104857600" |
KAFKA_CFG_SOCKET_SEND_BUFFER_BYTES | Kafka追加消息的最大尺寸 |
KAFKA_CFG_ZOOKEEPER_CONNECT_TIMEOUT_MS | 连接到zookeeper的超时时间(ms) |
---END---
!版权声明:本博客内容均为原创,每篇博文作为知识积累,写博不易,转载请注明出处。