Kubernetes 部署 Kafka & Zookeeper & Kafka Manager

Kubernetes 部署 Kafka & Zookeeper & Kafka Manager

文章目录

  !版权声明:本博客内容均为原创,每篇博文作为知识积累,写博不易,转载请注明出处。


系统环境:

  • Kubernetes 版本:1.14.0
  • kafka 版本:2.3.0
  • zookeeper 版本:3.4.14
  • kafka manager 版本:1.3.3
  • 示例部署文件 Github 地址:https://github.com/my-dlq/blog-example/tree/master/kubernetes/kafka-zookeeper-deploy

一、简介

Kafka 简介

       Kafka 是由 Apache 软件基金会开发的一个开源流处理平台,由 Scala 和 Java 编写。它是一个分布式、支持分区的、多副本,基于 zookeeper 协调的分布式消息系统。它最大特性是可以实时的处理大量数据以满足各种需求场景,比如基于 Hadoop 的批处理系统、低延时的实时系统、storm/Spark 流式处理引擎,web/nginx 日志,访问日志,消息服务等等。

Zookeeper 简介

       ZooKeeper 是一种分布式协调服务,用于管理大型主机。在分布式环境中协调和管理服务是一个复杂的过程。ZooKeeper 通过其简单的架构和 API 解决了这个问题。 ZooKeeper 允许开发人员专注于核心应用程序逻辑,而不必担心应用程序的分布式特性。

       Kafka 中主要利用 zookeeper 解决分布式一致性问题。Kafka 使用 Zookeeper 的分布式协调服务将生产者,消费者,消息储存结合在一起。同时借助 Zookeeper,Kafka 能够将生产者、消费者和 Broker 在内的所有组件在无状态的条件下建立起生产者和消费者的订阅关系,实现生产者的负载均衡。

Kafka Manager 简介

       Kafka Manager 是目前最受欢迎的 Kafka 集群管理工具,最早由雅虎开源,用户可以在 Web 界面执行一些简单的集群管理操作。

支持以下功能:

  • 管理 Kafka 集群
  • 方便集群状态监控 (包括topics, consumers, offsets, brokers, replica distribution, partition distribution)
  • 方便选择分区副本
  • 配置分区任务,包括选择使用哪些 Brokers
  • 可以对分区任务重分配
  • 提供不同的选项来创建及删除 Topic
  • Topic List 会指明哪些topic被删除
  • 批量产生分区任务并且和多个topic和brokers关联
  • 批量运行多个主题对应的多个分区
  • 向已经存在的主题中添加分区
  • 对已经存在的 Topic 修改配置
  • 可以在 Broker Level 和 Topic Level 的度量中启用 JMX Polling 功能
  • 可以过滤在 ZooKeeper 上没有 ids/owners/offsets/directories 的 consumer

二、部署过程

这个流程需要部署三个组件,分别为 Zookeeper、Kafka、Kafka Manager:

  • (1)、Zookeeper: 首先部署 Zookeeper,方便后续部署 Kafka 节点注册到 Zookeeper,用 StatefulSet 方式部署三个节点。
  • (2)、Kafka: 第二个部署的是 Kafka,设置环境变量来指定 Zookeeper 地址,用 StatefulSet 方式部署。
  • (3)、Kafka Manager: 最后部署的是 Kafka Manager,用 Deployment 方式部署,然后打开 Web UI 界面来管理、监控 Kafka。

三、Kubernetes 部署 Zookeeper & Kafka & Kafka Manager

1、创建 StorageClass

由于都是使用 StatefulSet 方式部署的有状态服务,所以 Kubernetes 集群需要提前设置一个 StorageClass 方便后续部署时指定存储分配(如果想指定为已经存在的 StorageClass 创建 PV 则跳过此步骤)。

此处用的是 NFS 存储驱动,如果是其它存储需要提前设置好相关配置

创建 StorageClass 部署文件

nfs-storage.yaml

 1apiVersion: storage.k8s.io/v1
 2kind: StorageClass
 3metadata:
 4  name: nfs-storage
 5provisioner: nfs-client    #动态卷分配服务指定的名称
 6parameters:
 7  archiveOnDelete: "true"  #设置为"false"时删除PVC不会保留数据,"true"则保留数据
 8mountOptions: 
 9  - hard                   #指定为硬挂载方式
10  - nfsvers=4              #指定NFS版本

部署 StorageClass

1$ kubectl apply -f nfs-storage.yaml

2、Kubernetes 部署 Zookeeper

创建 Zookeeper 部署文件

zookeeper.yaml

  1#部署 Service Headless,用于Zookeeper间相互通信
  2apiVersion: v1
  3kind: Service
  4metadata:
  5  name: zookeeper-headless
  6  labels:
  7    app: zookeeper
  8spec:
  9  type: ClusterIP
 10  clusterIP: None
 11  publishNotReadyAddresses: true
 12  ports:
 13  - name: client
 14    port: 2181
 15    targetPort: client
 16  - name: follower
 17    port: 2888
 18    targetPort: follower
 19  - name: election
 20    port: 3888
 21    targetPort: election
 22  selector:
 23    app: zookeeper
 24---
 25#部署 Service,用于外部访问 Zookeeper
 26apiVersion: v1
 27kind: Service
 28metadata:
 29  name: zookeeper
 30  labels:
 31    app: zookeeper
 32spec:
 33  type: ClusterIP
 34  ports:
 35  - name: client
 36    port: 2181
 37    targetPort: client
 38  - name: follower
 39    port: 2888
 40    targetPort: follower
 41  - name: election
 42    port: 3888
 43    targetPort: election
 44  selector:
 45    app: zookeeper
 46---
 47apiVersion: apps/v1
 48kind: StatefulSet
 49metadata:
 50  name: zookeeper
 51  labels:
 52    app: zookeeper
 53spec:
 54  serviceName: zookeeper-headless
 55  replicas: 3
 56  podManagementPolicy: Parallel
 57  updateStrategy:
 58    type: RollingUpdate
 59  selector:
 60    matchLabels:
 61      app: zookeeper
 62  template:
 63    metadata:
 64      name: zookeeper
 65      labels:
 66        app: zookeeper
 67    spec:      
 68      securityContext:
 69        fsGroup: 1001
 70      containers:
 71      - name: zookeeper
 72        image: docker.io/bitnami/zookeeper:3.4.14-debian-9-r25
 73        imagePullPolicy: IfNotPresent
 74        securityContext:
 75          runAsUser: 1001
 76        command:
 77         - bash
 78         - -ec
 79         - |
 80            # Execute entrypoint as usual after obtaining ZOO_SERVER_ID based on POD hostname
 81            HOSTNAME=`hostname -s`
 82            if [[ $HOSTNAME =~ (.*)-([0-9]+)$ ]]; then
 83              ORD=${BASH_REMATCH[2]}
 84              export ZOO_SERVER_ID=$((ORD+1))
 85            else
 86              echo "Failed to get index from hostname $HOST"
 87              exit 1
 88            fi
 89            . /opt/bitnami/base/functions
 90            . /opt/bitnami/base/helpers
 91            print_welcome_page
 92            . /init.sh
 93            nami_initialize zookeeper
 94            exec tini -- /run.sh            
 95        resources: 
 96          limits:
 97            cpu: 500m
 98            memory: 512Mi
 99          requests:
100            cpu: 250m
101            memory: 256Mi
102        env:
103        - name: ZOO_PORT_NUMBER
104          value: "2181"
105        - name: ZOO_TICK_TIME
106          value: "2000"
107        - name: ZOO_INIT_LIMIT
108          value: "10"
109        - name: ZOO_SYNC_LIMIT
110          value: "5"
111        - name: ZOO_MAX_CLIENT_CNXNS
112          value: "60"
113        - name: ZOO_SERVERS
114          value: "
115                  zookeeper-0.zookeeper-headless:2888:3888,
116                  zookeeper-1.zookeeper-headless:2888:3888,
117                  zookeeper-2.zookeeper-headless:2888:3888
118                 "
119        - name: ZOO_ENABLE_AUTH
120          value: "no"
121        - name: ZOO_HEAP_SIZE
122          value: "1024"
123        - name: ZOO_LOG_LEVEL
124          value: "ERROR"
125        - name: ALLOW_ANONYMOUS_LOGIN
126          value: "yes"
127        ports:
128        - name: client
129          containerPort: 2181
130        - name: follower
131          containerPort: 2888
132        - name: election
133          containerPort: 3888
134        livenessProbe:
135          tcpSocket:
136            port: client
137          initialDelaySeconds: 30
138          periodSeconds: 10
139          timeoutSeconds: 5
140          successThreshold: 1
141          failureThreshold: 6
142        readinessProbe:
143          tcpSocket:
144            port: client
145          initialDelaySeconds: 5
146          periodSeconds: 10
147          timeoutSeconds: 5
148          successThreshold: 1
149          failureThreshold: 6
150        volumeMounts:
151        - name: data
152          mountPath: /bitnami/zookeeper
153  volumeClaimTemplates:
154    - metadata:
155        name: data
156        annotations:
157      spec:
158        storageClassName: nfs-storage    #指定为上面创建的 storageclass
159        accessModes:
160          - ReadWriteOnce
161        resources:
162          requests:
163            storage: 5Gi

部署 Zookeeper

  • -n:指定应用启动的 Namespace,替换自己集群的 Namespace
1$ kubectl apply -f zookeeper.yaml -n mydlqcloud

3、Kubernetes 部署 Kafka

创建 Kafka 部署文件

kafka.yaml

  1#部署 Service Headless,用于Kafka间相互通信
  2apiVersion: v1
  3kind: Service
  4metadata:
  5  name: kafka-headless
  6  labels:
  7    app: kafka
  8spec:
  9  type: ClusterIP
 10  clusterIP: None
 11  ports:
 12  - name: kafka
 13    port: 9092
 14    targetPort: kafka
 15  selector:
 16    app: kafka
 17---
 18#部署 Service,用于外部访问 Kafka
 19apiVersion: v1
 20kind: Service
 21metadata:
 22  name: kafka
 23  labels:
 24    app: kafka
 25spec:
 26  type: ClusterIP
 27  ports:
 28  - name: kafka
 29    port: 9092
 30    targetPort: kafka
 31  selector:
 32    app: kafka
 33---
 34apiVersion: apps/v1
 35kind: StatefulSet
 36metadata:
 37  name: "kafka"
 38  labels:
 39    app: kafka
 40spec:
 41  selector:
 42    matchLabels:
 43      app: kafka
 44  serviceName: kafka-headless
 45  podManagementPolicy: "Parallel"
 46  replicas: 3
 47  updateStrategy:
 48    type: "RollingUpdate"
 49  template:
 50    metadata:
 51      name: "kafka"
 52      labels:
 53        app: kafka
 54    spec:      
 55      securityContext:
 56        fsGroup: 1001
 57        runAsUser: 1001
 58      containers:
 59      - name: kafka
 60        image: "docker.io/bitnami/kafka:2.3.0-debian-9-r4"
 61        imagePullPolicy: "IfNotPresent"
 62        resources:
 63          limits:
 64            cpu: 500m
 65            memory: 512Mi
 66          requests:
 67            cpu: 250m
 68            memory: 256Mi
 69        env:
 70        - name: MY_POD_IP
 71          valueFrom:
 72            fieldRef:
 73              fieldPath: status.podIP
 74        - name: MY_POD_NAME
 75          valueFrom:
 76            fieldRef:
 77              fieldPath: metadata.name
 78        - name: KAFKA_CFG_ZOOKEEPER_CONNECT
 79          value: "zookeeper"                 #Zookeeper Service 名称
 80        - name: KAFKA_PORT_NUMBER
 81          value: "9092"
 82        - name: KAFKA_CFG_LISTENERS
 83          value: "PLAINTEXT://:$(KAFKA_PORT_NUMBER)"
 84        - name: KAFKA_CFG_ADVERTISED_LISTENERS
 85          value: 'PLAINTEXT://$(MY_POD_NAME).kafka-headless:$(KAFKA_PORT_NUMBER)'
 86        - name: ALLOW_PLAINTEXT_LISTENER
 87          value: "yes"
 88        - name: KAFKA_HEAP_OPTS
 89          value: "-Xmx512m -Xms512m"
 90        - name: KAFKA_CFG_LOGS_DIRS
 91          value: /opt/bitnami/kafka/data
 92        - name: JMX_PORT
 93          value: "9988"
 94        ports:
 95        - name: kafka
 96          containerPort: 9092
 97        livenessProbe:
 98          tcpSocket:
 99            port: kafka
100          initialDelaySeconds: 10
101          periodSeconds: 10
102          timeoutSeconds: 5
103          successThreshold: 1
104          failureThreshold: 2
105        readinessProbe:
106          tcpSocket:
107            port: kafka
108          initialDelaySeconds: 5
109          periodSeconds: 10
110          timeoutSeconds: 5
111          successThreshold: 1
112          failureThreshold: 6
113        volumeMounts:
114        - name: data
115          mountPath: /bitnami/kafka
116  volumeClaimTemplates:
117    - metadata:
118        name: data
119      spec:
120        storageClassName: nfs-storage    #指定为上面创建的 storageclass
121        accessModes:
122          - "ReadWriteOnce"
123        resources:
124          requests:
125            storage: 5Gi

部署 Kafka

  • -n:指定应用启动的 Namespace,替换自己集群的 Namespace
1$ kubectl apply -f kafka.yaml -n mydlqcloud

4、Kubernetes 部署 Kafka Manager

创建 Kafka Manager 部署文件

kafka-manager.yaml

 1apiVersion: v1
 2kind: Service
 3metadata:
 4  name: kafka-manager
 5  labels:
 6    app: kafka-manager
 7spec:
 8  type: NodePort
 9  ports:
10  - name: kafka
11    port: 9000
12    targetPort: 9000
13    nodePort: 30900
14  selector:
15    app: kafka-manager
16---
17apiVersion: apps/v1
18kind: Deployment
19metadata:
20  name: kafka-manager
21  labels:
22    app: kafka-manager
23spec:
24  replicas: 1
25  selector:
26    matchLabels:
27      app: kafka-manager
28  template:
29    metadata:
30      labels:
31        app: kafka-manager
32    spec:
33      containers:
34      - name: kafka-manager
35        image: zenko/kafka-manager:1.3.3.22
36        imagePullPolicy: IfNotPresent
37        ports:
38        - name: kafka-manager
39          containerPort: 9000
40          protocol: TCP
41        env:
42        - name: ZK_HOSTS
43          value: "zookeeper:2181"
44        livenessProbe:
45          httpGet:
46            path: /api/health
47            port: kafka-manager
48        readinessProbe:
49          httpGet:
50            path: /api/health
51            port: kafka-manager
52        resources:
53          limits:
54            cpu: 500m
55            memory: 512Mi
56          requests:
57            cpu: 250m
58            memory: 256Mi

部署 Kafka Manager

  • -n:指定应用启动的 Namespace,替换自己集群的 Namespace
1$ kubectl apply -f kafka-manager.yaml -n mydlqcloud

四、进入 Kafka Manager 管理 Kafka 集群

这里的 Kubernetes 集群地址为:192.168.2.11,并且在上面设置 Kafka-Manager 网络策略为 NodePort 方式,且设置端口为 30900,这里输入地址:http://192.168.2.11:30900 访问 Kafka Manager。

进入后先配置 Kafka Manager,增加一个 Zookeeper 地址。

配置三个必填参数:

  • Cluster Name:自定义一个名称,任意输入即可。
  • Zookeeper Hosts:输入 Zookeeper 地址,这里设置为 Zookeeper 服务名+端口。
  • Kafka Version:选择 kafka 版本。

配置完成后就可以看到新增了一条记录,点进去就可以查看相关集群信息。

五、附录:镜像参数配置

Zookeeper 镜像可配置变量参数

  • 镜像 github 地址:https://github.com/bitnami/bitnami-docker-zookeeper
参数名称 描述
ZOO_PORT_NUMBER Zookeeper客户端端口。默认值:2181
ZOO_SERVER_ID 集合中服务器的ID。默认值:1
ZOO_TICK_TIME ZooKeeper用于心跳的基本时间单位(以毫秒为单位)。默认值:2000
ZOO_INIT_LIMIT ZooKeeper用于限制仲裁中ZooKeeper服务器连接到领导者的时间长度。默认值:10
ZOO_SYNC_LIMIT 服务器与领导者的过时距离。默认值:5
ZOO_MAX_CLIENT_CNXNS 限制单个客户端可能对ZooKeeper集合的单个成员进行的并发连接数。默认60
ZOO_SERVERS 逗号,空格或冒号分隔的服务器列表。示例:zoo1:2888:3888,zoo2:2888:3888。没有默认值。
ZOO_CLIENT_USER 将使用Zookeeper客户端进行身份验证的用户。默认值:无默认值。
ZOO_CLIENT_PASSWORD 将使用Zookeeper客户端进行身份验证的密码。没有默认值。
ZOO_SERVER_USERS 逗号,分号或空格分隔的要创建的用户列表。示例:user1,user2,admin。没有默认值
ZOO_SERVER_PASSWORDS 逗号,半精或空格分隔的密码列表,在创建时分配给用户。示例:pass4user1,pass4user2,pass4admin。没有默认值
ZOO_ENABLE_AUTH 启用Zookeeper身份验证。它使用SASL / Digest-MD5。默认值:否
ZOO_RECONFIG_ENABLED 启用ZooKeeper动态重配置。默认值:否
ZOO_HEAP_SIZE Java堆选项(Xmx和XM)的大小(MB)。如果通过Xmx配置Xm,则忽略此env var JVMFLAGS。默认值:1024
ZOO_LOG_LEVEL Zookeeper日志级别。可用级别为:ALL,DEBUG,INFO,WARN,ERROR,FATAL,OFF,TRACE。默认值:INFO
ALLOW_ANONYMOUS_LOGIN 如果设置为true,则允许接受来自未经身份验证的用户的连接。默认值:否
JVMFLAGS ZooKeeper进程的默认JVMFLAGS。没有默认值

Kafka 镜像可配置变量参数

  • 镜像 github 地址:https://github.com/bitnami/bitnami-docker-kafka
参数名称 描述
KAFKA_CFG_ZOOKEEPER_CONNECT Zookeeper集群地址,例如"zookeeper:2181"
KAFKA_PORT_NUMBER Kafka端口,例如"9092"
ALLOW_PLAINTEXT_LISTENER 是否启用Plaintext侦听器,默认"false"
KAFKA_CFG_LISTENERS Kafka 监听列表,broker对外提供服务时绑定的IP和端口
KAFKA_CFG_ADVERTISED_LISTENERS 给客户端用的发布至zookeeper的监听,broker 会上送此地址到zookeeper,zookeeper会将此地址提供给消费者,消费者根据此地址获取消息。
KAFKA_HEAP_OPTS Java JVM堆内存大小配置,例如"-Xmx512m -Xms512m"
KAFKA_CFG_LOGS_DIRS Kafka 日志存储目录
JMX_PORT JMX端口配置,设置此参数才能开启JMX,例如设置为"9988"
KAFKA_CFG_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM 是否启用基于主机名的认证认证,例如想开启可以设置为"https"
KAFKA_CFG_BROKER_ID Broker ID值,每个节点值都唯一。默认为"-1",自动生成BrokerId
KAFKA_CFG_DELETE_TOPIC_ENABLE 是否允许删除Topic,默认"false"
KAFKA_CFG_LOG_FLUSH_INTERVAL_MESSAGES 此项配置指定时间间隔,强制进行fsync日志,默认"10000"
KAFKA_CFG_LOG_FLUSH_INTERVAL_MS 此项配置用来置顶强制进行fsync日志到磁盘的时间间隔"1000"
KAFKA_CFG_LOG_RETENTION_BYTES 每个Topic下每个Partition保存数据的总量,超过限制都会删除一个段文件,默认"1073741824"
KAFKA_CFG_LOG_RETENTION_CHECK_INTERVALS_MS 检查日志分段文件的间隔时间,以确定是否文件属性是否到达删除要求,默认"300000"
KAFKA_CFG_LOG_RETENTION_HOURS 每个日志文件删除之前保存的时间,默认数据保存时间对所有topic都一样,默认"168"
KAFKA_CFG_LOG_MESSAGE_FORMAT_VERSION 指定broker将用于将消息添加到日志文件的消息格式版本
KAFKA_CFG_MAX_MESSAGE_BYTES Kafka允许的最大记录批大小。默认"1000000"
KAFKA_CFG_SEGMENT_BYTES 日志段文件的最大大小。当达到这个大小时,将创建一个新的日志段。默认"1073741824"
KAFKA_CFG_DEFAULT_REPLICATION_FACTOR 对replica的数目进行配置,默认值为"1",表示不对topic进行备份。如果配置为2,表示除了leader节点,对于topic里的每一个partition,都会有一个额外的备份。
KAFKA_CFG_OFFSETS_TOPIC_REPLICATION_FACTOR Topic的offset的备份份数。建议设置更高的数字保证更高的可用性
KAFKA_CFG_TRANSACTION_STATE_LOG_REPLICATION_FACTOR 事务Topic的复制因子(设置得更高以确保可用性)
KAFKA_CFG_TRANSACTION_STATE_LOG_MIN_ISR 事务Topic的副本数
KAFKA_CFG_NUM_IO_THREADS 用来处理请求的I/O线程的数目
KAFKA_CFG_NUM_NETWORK_THREADS 用来处理网络请求的网络线程数目
KAFKA_CFG_NUM_PARTITIONS 每个主题的日志分区的默认数量
KAFKA_CFG_NUM_RECOVERY_THREADS_PER_DATA_DIR 每个数据目录中的线程数,用于在启动时日志恢复,并在关闭时刷新。默认"1"
KAFKA_CFG_SOCKET_RECEIVE_BUFFER_BYTES SO_RCVBUFF缓存大小,server进行socket连接时所用
KAFKA_CFG_SOCKET_REQUEST_MAX_BYTES 允许的最大请求尺寸,这将避免server溢出,它应该小于Java heap size,默认"104857600"
KAFKA_CFG_SOCKET_SEND_BUFFER_BYTES Kafka追加消息的最大尺寸
KAFKA_CFG_ZOOKEEPER_CONNECT_TIMEOUT_MS 连接到zookeeper的超时时间(ms)

---END---


  !版权声明:本博客内容均为原创,每篇博文作为知识积累,写博不易,转载请注明出处。