首页 > 技术文章 > Kafka集群搭建

wusier 2021-04-10 15:38 原文

ZooKeeper 的作用

Kafka使用ZooKeeper管理集群,ZooKeeper用于协调服务器或集群拓扑,ZooKeeper是配置信息的一致性文件系统。你可以选择Kafka自带的Zookeeper,也可以选择单独部署,一台Linux主机开放三个端口即可构建一个简单的伪ZooKeeper集群。

zookeeper中记录了broker的id 、消费者消费数据的offset,消费者与partition的对应关系(ConsumerA—> Partition-0, ConsumerB—> Partition-1)

ZooKeeper可以将拓扑更改发送到Kafka,如果集群中的某台服务器宕机或者某个topic被添加、删除,集群中的每个节点都可以知道新服务器何时加入,ZooKeeper提供Kafka Cluster配置的同步视图。

Zookeeper简介

ZooKeeper 是一个开源的分布式框架,提供了协调分布式应用的基本服务。它向外部应用 暴露一组通用服务——分布式同步(Distributed Synchronization)、命名服务(Naming Service)、集群维护(group maintenance)等,简化分布式应用协调及其管理的难度。 它是 google 的 chubby 一个开源的实现。 它本身可以搭建成一个集群,这个 zk 集群用来对应用程序集群进行管理,监视应用程序集群中各个节点的状态,并根据应用程序集群中各个节点提交的反馈信息决定下一步的合理操作。 做分布式锁很有效 image

Zookeeper下载和安装

实现的是Zookeeper集群的创建,注意要修改zoo.dfg文件名称

(base) root@top1server:~# wget https://archive.apache.org/dist/zookeeper/zookeeper-3.4.10/zookeeper-3.4.10.tar.gz
(base) root@top1server:~# tar -zvxf zookeeper-3.4.10.tar.gz
(base) root@top1server:~# cp -r zookeeper-3.4.10/ zookeeper1
(base) root@top1server:~# cp -r zookeeper-3.4.10/ zookeeper2
(base) root@top1server:~# cp -r zookeeper-3.4.10/ zookeeper3
(base) root@top1server:~/zookeeper/zookeeper1/conf# mv zoo_sample.cfg zoo.cfg
(base) root@top1server:~/zookeeper/zookeeper1/bin# ./zkServer.sh start
(base) root@top1server:~/zookeeper/zookeeper1/bin# ./zkServer.sh status

image

image

部署三个节点的Zookeeper伪分布集群

在同一台服务器上,部署一个 3 个 ZooKeeper 节点组成的集群,这样的集群叫伪分布式集群,而如果集群中的 3 个节点分别部署在 3 个服务器上,那么这种集群就叫真正的分布式集群。

(base) root@top1server:~/zookeeper# cd zookeeper1
(base) root@top1server:~/zookeeper/zookeeper1# mkdir data
(base) root@top1server:~/zookeeper/zookeeper1# mkdir logs
(base) root@top1server:~/zookeeper/zookeeper1# touch data/myid
(base) root@top1server:~/zookeeper/zookeeper1# cd data/
(base) root@top1server:~/zookeeper/zookeeper1/data# cat myid
1

设置配置文件 zoo.cfg 的内容如下:
image

配置文件中的配置项的含义参见下面的介绍。

  • 用同样的方法,在 zookeeper2 和 zookeeper3 的相应位置创建 zoo.cfg,文件内容复制zookeeper1 的 zoo.cfg。
  • 只不过需要改动 clientport、dataDir、dataLogDir 三个配置项,zookeeper2 的 clientport 改为 2182,zookeeper3 的 clientport 改为 2183,而 dataDir和 dataLogDir 都修改为相应的目录,就好了。

启动 zk 集群

接下来再分别启动服务

(base) root@top1server:~/zookeeper/zookeeper1/bin# sh zkServer.sh start
(base) root@top1server:~/zookeeper/zookeeper2/bin# sh zkServer.sh start
(base) root@top1server:~/zookeeper/zookeeper3/bin# sh zkServer.sh start

这样 zookeeper 集群的 3 个节点都启动起来了。
image

image

image

image
一个leader多个server,也遵循2n+1的选举策略,想保证高可用至少有三台机器

客户端接入集群

进入 zookeeper 集群中任意一个节点的 bin 目录下,启动一个客户端,接入已经启动好的zookeeper 集群。
这里的 server 可以填写集群中的任何一个节点的 ip,端口号是对应 ip的节点的配置文件中 clientport 的值。

./zkCli.sh –server 127.0.0.1:2181

真实分布式集群需要注意的地方

真正的分布式集群和伪分布式集群不一样的地方在于配置文件。

  1. clientport 端口各个节点一样就行。
  2. server.1=127.0.0.1:8880:7770 中的 ip 要修改成对应的 server 的 ip,后边的两个端口号不需要不同,各个节点都一样就可以了。
    其他地方伪分布式和真正分布式都是一样的。

ZooKeeper 配置文件中的配置项的含义

配置文件中配置项的含义:

  • tickTime: zookeeper 中使用的基本时间单位,毫秒值,比如可以设为 1000,那么基本时间单位就是 1000ms,也就是 1s。
  • initLimit: zookeeper 集群中的包含多台 server,其中一台为 leader,集群中其余的 server 为 follower,initLimit 参数配置初始化连接时,follower 和 leader 之间的最长 心 跳 时 间 。 如 果 该 参 数 设 置 为 5 , 就 说 明 时 间 限 制 为 5 倍 tickTime , 即5*1000=5000ms=5s。
  • syncLimit: 该参数配置 leader 和 follower 之间发送消息,请求和应答的最大时间长度。如果该参数设置为 2,说明时间限制为 2 倍 tickTime,即 2000ms。
  • dataDir: 数据目录. 可以是任意目录,一般是节点安装目录下 data 目录。
  • dataLogDir: log目录, 同样可以是任意目录,一般是节点安装目录下的logs目录。如果没有设置该参数,将使用和 dataDir 相同的设置。
  • clientPort: 监听 client 连接的端口号。
  • server.X=A:B:C 其中 X 是一个数字, 表示这是第几号 server,它的值和 myid 文件中的值对应。A 是该 server 所在的 IP 地址。B 是配置该 server 和集群中的 leader 交换消息所使用的端口。C 配置选举 leader 时所使用的端口。由于配置的是伪集群模式,所以各个 server 的 B, C 参数必须不同,如果是真正分布式集群,那么 B 和 C 在各个节点上可以相同,因为即使相同由于节点处于不同的服务器也不会导致端口冲突

Zookeeper 常用命令

  1. 启动 ZK 服务: bin/zkServer.sh start
  2. 查看 ZK 服务状态: bin/zkServer.sh status
  3. 停止 ZK 服务: bin/zkServer.sh stop
  4. 重启 ZK 服务: bin/zkServer.sh restart
  5. 连接服务器: zkCli.sh -server 127.0.0.1:2181

image

创建节点

使用 create 命令,可以创建一个 Zookeeper 节点, 如

create [-s] [-e] path data acl

其中,-s 或-e 分别指定节点特性,顺序或临时节点,若不指定,则表示持久节点;acl 用来进行权限控制。

image
顺序节点可以看到创建的节点后面添加了一串数字以示区别。
image

image
临时节点在退出会自动删除

使用 create /zk-permanent 123 命令创建 zk-permanent 永久节点
可以看到永久节点不同于顺序节点,不会自动在后面添加一串数字。
image

读取节点

与读取相关的命令有 ls 命令和 get 命令,ls 命令可以列出 Zookeeper 指定节点下的所有子节点,只能查看指定节点下的第一级的所有子节点;
get 命令可以获取 ZK 指定节点的数据内容和属性信息。其用法分别如下

ls path [watch]
get path [watch]
ls2 path [watch]

ls2可以查看所有内容
get可以获取系欸但的数据内容和属性

更新节点

使用 set 命令,可以更新指定节点的数据内容,用法如下

set path data [version]

其中,data 就是要更新的新内容,version 表示数据版本,如将/zk-permanent 节点的数据更新为 456,可以使用如下命令:set /zk-permanent 456
dataVersion 会变为 1 了,表示进行了更新。

删除节点

使用 delete 命令可以删除 Zookeeper 上的指定节点,用法如下

delete path [version]

其 中 version 也 是 表 示 数 据 版 本 , 使 用 delete /zk-permanent 命 令 即 可 删 除/zk-permanent 节点

值得注意的是,若删除节点存在子节点,那么无法删除该节点,必须先删除子节点,再删除父节点

zk的使用都是靠这些节点使用

分布式锁的生成是用临时顺序节点来实现的

Kafka概述

Kafka消息队列

image
(1)点对点模式(一对一,消费者主动拉取数据,消息收到后消息清除)
点对点模型通常是一个基于拉取或者轮询的消息传送模型,这种模型从队列中请求信息,而不是将消息推送到客户端。这个模型的特点是发送到队列的消息被一个且只有一个接收者接收处理,即使有多个消息监听者也是如此。
(2)发布/订阅模式(一对多,数据生产后,推送给所有订阅者)
发布订阅模型则是一个基于推送的消息传送模型。发布订阅模型可以有多种不同的订阅者,临时订阅者只在主动监听主题时才接收消息,而持久订阅者则监听主题的所有消息,即使当前订阅者不可用,处于离线状态。

为什么需要消息队列

1)解耦:
允许你独立的扩展或修改两边的处理过程,只要确保它们遵守同样的接口约束。
2)冗余:
消息队列把数据进行持久化直到它们已经被完全处理,通过这一方式规避了数据丢失风险。许多消息队列所采用的"插入-获取-删除"范式中,在把一个消息从队列中删除之前,需要你的处理系统明确的指出该消息已经被处理完毕,从而确保你的数据被安全的保存直到你使用完毕。
3)扩展性:
因为消息队列解耦了你的处理过程,所以增大消息入队和处理的频率是很容易的,只要另外增加处理过程即可。
4)灵活性 & 峰值处理能力:
在访问量剧增的情况下,应用仍然需要继续发挥作用,但是这样的突发流量并不常见。如果为以能处理这类峰值访问为标准来投入资源随时待命无疑是巨大的浪费。使用消息队列能够使关键组件顶住突发的访问压力,而不会因为突发的超负荷的请求而完全崩溃。
5)可恢复性:
系统的一部分组件失效时,不会影响到整个系统。消息队列降低了进程间的耦合度,所以即使一个处理消息的进程挂掉,加入队列中的消息仍然可以在系统恢复后被处理。
6)顺序保证:
在大多使用场景下,数据处理的顺序都很重要。大部分消息队列本来就是排序的,并且能保证数据会按照特定的顺序来处理。(Kafka 保证一个 Partition 内的消息的有序性)
7)缓冲:
有助于控制和优化数据流经过系统的速度,解决生产消息和消费消息的处理速度不一致的情况。
8)异步通信:
很多时候,用户不想也不需要立即处理消息。消息队列提供了异步处理机制,允许用户把一个消息放入队列,但并不立即处理它。想向队列中放入多少消息就放多少,然后在需要的时候再去处理它们。

什么是 Kafka

在流式计算中,Kafka 一般用来缓存数据,Storm 通过消费 Kafka 的数据进行计算。
1)Apache Kafka 是一个开源消息系统,由 Scala 写成。是由 Apache 软件基金会开发的一个开源消息系统项目。
2)Kafka 最初是由 LinkedIn 公司开发,并于 2011 年初开源。2012 年 10 月从 Apache Incubator毕业。该项目的目标是为处理实时数据提供一个统一、高通量、低等待的平台。
3)Kafka 是一个分布式消息队列。Kafka 对消息保存时根据 Topic 进行归类,发送消息者称为Producer,消息接受者称为 Consumer,此外 kafka 集群有多个 kafka 实例组成,每个实例(server)称为broker。
4)无论是 kafka 集群,还是 consumer 都依赖于 zookeeper 集群保存一些 meta 信息,来保证系统可用性。
image
1)Producer :消息生产者,就是向 kafka broker 发消息的客户端;
2)Consumer :消息消费者,向 kafka broker 取消息的客户端;
3)Topic :可以理解为一个队列;
4) Consumer Group (CG):这是 kafka 用来实现一个 topic 消息的广播(发给所有的 consumer)和单播(发给任意一个 consumer)的手段。一个 topic 可以有多个 CG。topic 的消息会复制(不是真的复制,是概念上的)到所有的 CG,但每个 partion 只会把消息发给该 CG 中的一个 consumer。如果需要实现广播,只要每个 consumer 有一个独立的 CG 就可以了。要实现单播只要所有的consumer 在同一个 CG。用 CG 还可以将 consumer 进行自由的分组而不需要多次发送消息到不同的 topic;
5)Broker :一台 kafka 服务器就是一个 broker。一个集群由多个 broker 组成。一个 broker 可以容纳多个 topic;
6)Partition:为了实现扩展性,一个非常大的 topic 可以分布到多个 broker(即服务器)上,一个 topic可以分为多个 partition,每个 partition 是一个有序的队列。partition 中的每条消息都会被分配一个有序的 id(offset)。kafka 只保证按一个 partition 中的顺序将消息发给 consumer,不保证一个 topic 的整体(多个 partition 间)的顺序;
7)Offset:kafka 的存储文件都是按照 offset.kafka 来命名,用 offset 做名字的好处是方便查找。例如你 想 找 位 于 2049 的 位 置 , 只 要 找 到 2048.kafka 的 文 件 即 可 。 当 然 the first offset 就 是
00000000000.kafka。

Kafka 单节点运行方式

https://kafka.apache.org/

下载代码

下载 kafka_2.13-2.7.0 版本并且解压。
https://downloads.apache.org/kafka/2.7.0/kafka_2.13-2.7.0.tgz

wget https://downloads.apache.org/kafka/2.7.0/kafka_2.13-2.7.0.tgz
tar -zvxf kafka_2.13-2.7.0.tgz
cd kafka_2.13-2.7.0/

首先为每个 broker 创建一个配置文件,直接复制多个server.properties,并且修改他们的log日志和id

Kafka 集群部署方式

配置broker的id
image
设置log文件地址
image
关联zookeeper
image

启动服务

现在启动 Kafka 服务器:

bin/kafka-server-start.sh config/server.properties

创建topic

创建一个名为“test”的 topic,它有一个分区和一个副本:

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

运行 list(列表)命令来查看这个 topic:

bin/kafka-topics.sh --list --zookeeper localhost:2181 test

除了手工创建 topic 外,你也可以配置你的 broker,当发布一个不存在的 topic 时自动创建 topic。

推荐阅读