首页 > 技术文章 > <zk在大型分布式系统中的应用>

wttttt 2017-10-11 21:09 原文

Hadoop

  • 在hadoop中,zk主要用来实现HA(High Availability)。这部分逻辑主要集中在hadoop common的HA模块中,HDFS的NameNode和Yarn的ResourceManager都是基于此HA模块来实现自己的HA功能的。同时,在YARN中又特别提供了zk来存储应用的运行状态。

YARN

  • Yarn主要由ResourceManager、NodeManager、ApplicationMaster和Container四部分组成。
  • 其中最核心的就是RM,它作为全局的资源管理器,负责整个系统的资源管理和分配。

RM单点问题

  • RM是YARN中非常复杂的一个组件,负责集群中所有资源的统一管理和分配,同时接受各个NM的资源汇报信息,并把这些信息按照一定的策略分配给各个app。
  • ResourceManager HA的解决方案就是使用 Active/Standby模式的RM HA架构
  • 可以看到,在运行期间会有多个RM存在,但只有一个处于Active状态,其他的是standby。当active节点无法正常工作时,其余处于standby的节点则会通过竞争选举产生新的节点。

主备切换

    • RM使用基于zk实现的ActiveStandbyElector组件来确定RM的状态:Active或Standby。
      具体做法如下:
      1. 创建锁节点:在zk上有一个类似于/yarn-leader-election/pseudo-yarn-rm-cluster的锁节点,所有rm在启动时都会去竞争一个lock子节点/yarn-leader-election/pseudo-yarn-rm-cluster/ActiveStandbyElectorLock,该节点是临时节点。
      2. 注册Watcher监听:所有standby状态的rm都会向/yarn-leader-election/pseudo-yarn-rm-cluster/ActiveStandbyElectorLock节点注册一个节点变更的Watcher监听,利用临时节点的特性,能快速感知到Active状态的RM的运行状态。
      3. 主备切换:当active节点不可用时,其对应zk上的Lock节点也会随之被删除,其他standby状态的rm都会接收到zk server的watcher事件通知,然后重复步骤1。
    • 以上,实际上就是通过临时节点 + Watcher事件通知来实现的。
    • HDFS中的NameNode和RM模块都是使用ActiveStandbyElector组件来实现各自的HA的。

Fencing(隔离)

  • 在分布式环境中,经常会出现诸如单机“假死”的情况。所谓“假死”是指机器由于网络闪断或是自身负载过高(常见的有GC占用时间过长或CPU的负载过高等)而导致无法正常地对外进行及时响应
  • 在上述主备切换过程中,我们假设RM集群由rm1和rm2组成,且rm1为active,rm2为standby。某一刻rm1发生“假死”现象,此时zk认为rm1挂了,从而主备切换,rm2称为active。但是随后,rm1恢复了正常,其依然认为自己还处于active。那么就出现了我们常说的分布式“脑裂”(Brain-Split)现象,即存在多个处于Active状态的rm各司其职。
  • Yarn引入了Fencing机制来解决上述问题,即借助zk数据节点的ACL权限控制机制来实现不同rm之间的隔离
  • 具体做法很简单:在主备切换过程中,多个rm之间通过竞争创建锁节点来实现主备状态的确定。这个过程中,有一点需要改进:创建的根节点必须携带zk的ACL信息,目的是为了独占该根节点,以防止其他RM对该节点进行更新。
  • 具体来说:
    • RM1出现假死之后,zk就会将其创建的锁节点移除,此时RM2会创建相应的锁节点,并切换为Active状态。RM1恢复之后,会试图去更新zk的相关数据,但是此时发现自己没有权限更新zk的相关数据节点。从而rm1自动切换到Standby状态。

RM状态存储

  • 在RM中,RMStateStore能够存储一些RM内部状态信息,包括Application以及它们的Attempts信息、Delegatioln Token及Version Information等。
  • RMStateStore中绝大多数信息都是不需要持久化存储,因为很容易从上下文信息中将其重构出来。
  • 在存储的设计方案中,提供了三种可能的实现,分别如下:
    • 基于内存实现,一般用于日常开发测试;
    • 基于文件系统的实现,如HDFS;
    • 基于zk的实现
  • 由于这些状态数据量都不大,因此hadoop官方建议基于zk来实现状态信息存储
  • 在zk上,rm的状态信息都存储在/rmstore这个根节点上,其数据节点的组织结构如下:
  • Please read this link.

HBase

  • HBase是一个基于Hadoop文件系统设计的面向海量数据的高可靠性、高性能、面向列、可伸缩的分布式存储系统。
  • 与大部分分布式NoSQL数据库不同的是,HBase针对数据写入具有强一致性,甚至包括索引列也实现了强一致性。
  • HBase整体架构如下图:
  • 可以看到,在整个HBase架构体系中,zk是串联起HBase集群与client的关键所在。
  • 早期Hbase没有引入zk时,存在一系列问题:
    • RegionServer挂掉时,系统无法及时得知信息,client也无法知晓,因此服务难以迁移至其他RegionServer上
    • 类似的问题都是缺少相应的分布式协调组件
  • 下面从以下几个方面讲解zk在HBase中的应用场景。

系统冗错

  • (系统冗错其实就是监控rs
  • 当HBase启动时,每个RegionServer都会到zk的/hbase/rs节点下创建一个信息节点(rs状态节点),例如/hbase/rs/[Hostname],同时HMaster会监听该节点。
  • 当某个rs挂掉时,zk会因为在一段时间内无法接受其心跳信息(即Session失效),而删除掉该rs对应的rs状态节点。同时,HMaster收到zk的NodeDelete通知,并立即开始冗错工作:
    • HMaster会将该RegionServer所处理的数据分片(Region)重新路由到其他节点上,并记录到Meta信息中供client查询
  • 那么,为什么不直接让HMaster来负责RegionServer的监控呢?是因为通过心跳机制来管理rs状态会使HMaster的负载随着系统容量而不断增大。并且,HMaster可能挂掉,因此数据还需要持久化。

RootRegion管理

  • 对HBase集群而言,数据存储的位置信息是记录在元数据分片,也就是RootRegion上的。每次client发起新的请求,需要知道数据的位置,就会去查询RootRegion,而RootRegion自身的位置是记录在zk上的。当RootRegion发生变化,比如Region的手工移动、Balance或是RootRegion所在server发生故障等时,就能通过zk来感知到这一变化并做出一系列容灾措施。

Region状态管理

  • Region是HBase中数据的物理切片,每个Region中记录了全局数据的一小部分。
  • 对一个分布式系统而言,Region是会经常发生变更的。一旦Region发生移动,它必然会经历Offline和重新Online的过程。
  • Offline期间数据是不能被访问的,并且Region的这个状态必须被全局知晓,否则可能会出现事务性的异常。

分布式SplitLog任务管理

  • 当某台RS挂掉时,由于总有一部分新写入的数据还没有持久化到HFile中,因此在迁移该RegionServer的服务时,一个重要的工作就是从HLog中恢复这部分还在内存中的数据(WAL)。
  • 而这部分工作最关键的一步就是SpitLog,即HMaster需要遍历该RS的HLog,并按Region切分成小块移动到新地址下,并进行数据的Replay。
  • 由于单个RS的日志量相对庞大(可能有数千个Region,上GB的日志),一个快速恢复的可行方案就是将这个处理HLog的任务分配给多台rs来共同处理。
  • 因此需要一个持久化组件来辅助HMaster完成任务的分配。当前的做法是,在zk上创建一个splitlog节点,将rs和待处理的region之间的映射关系存放到该节点。然后各个rs到该节点上领取任务并在执行后将成功or失败的信息更新到节点。
  • 整个过程中,zk担负了分布式集群中相互通知和信息持久化的角色。

Kafka

  • kafka:开源分布式消息系统
  • 主要用于实现低延迟的发送和收集大量的事件和日志数据。
  • kafka是一个吞吐量极高分布式消息系统,其整体设计是典型的发布与订阅模式
  • 在kafka集群中,没有“中心主节点”的概念,所有server都是对等的。 -->   因而可以在不做任何配置更改的情况下实现server的添加和删除。同样,消息的生产者和消费者也能够随意重启和机器的上下线。
  • kafka中生产者和消费者之间的部署关系如下图:

术语介绍

  • Kafka是一个近似符合JMS规范的消息中间件实现。
  • producer:
  • consumer:
  • topic:由用户定义并配置在kafka server端,用于建立producer和consumer之间的订阅关系
  • partition:一个topic下面会分为多个分区
  • broker:kafka的服务器,用于存储消息。
  • group:用于归组同类消费者
  • offset:

Broker注册

  • 虽然broker是分布式部署并且相互之间是互相独立运行的,但还是需要有一个注册系统能够将整个集群中的broker server都管理起来。  -->   在kafka的设计中,选择了使用zk来进行所有broker的管理。
  • broker节点:/brokers/ids   -->    每个broker sever启动时都会到zk上注册 /broker/ids/[0...N]  (临时节点,以动态表征broker server的可用性),broker会把自己的IP和port写入该节点。
  • 可以看出每个broker节点都有一个全局唯一的 Broker ID

Topic注册

  • kafka中,用一个topic的消息会分成多个partition并分布到多个broker中。   -->   映射关系由zk维护
  • topic节点:/brokers/topics/[topic]   (临时节点)

生产者负载均衡

  • 因为同一个topic会被分区从而分布到不同的broker server上。因而生产者需要将消息合理地发送到这些分布式的broker上。 -->  从而产生了如何进行生产者负载均衡的问题。
  • 对于生产者的负载均衡,kafka支持传统的四层负载均衡,同时也支持使用zk的方式来实现负载均衡。

四层负载均衡

  • 设计简单:根据生产者的IP地址和端口来为其确定一个相关联的broker。通常一个生产者只会对应单个broker
  • 优点:1. 整体逻辑简单,不需要引入第三方系统。2. 每个producer只需与broker维护单个TCP链接即可。
  • 缺点:1. 无法做到真正的负载均衡。因为每个producer产生的消息量不同,从而导致不同broker接收到的消息总数非常不均匀。2. 生产者无法实时感知到broker的新增与删除。

使用zk进行负载均衡

  • 在kafka中,client使用了基于zk的负载均衡策略来解决producer的负载均衡问题
  • kafka producer会对zk上的“broker的新增与减少”、“Topic的新增与减少”和“Broker与Topic关联关系的变化”等事件注册watcher监听,从而实现一种动态的负载均衡机制。

消费者负载均衡

  • kafka consumer也需要进行负载均衡来实现多个consumers合理地从对应broker server上接收消息。(生产者负载均衡则是producer合理地选择broker来发送消息)
  • 对于一个消费者分组,如果组内的consumers发生变更或broker server发生变更,会触发消费者负载均衡。
  • 关于consumer group:group是对整个kafka集群的概念,kafka保证每条消息在同一个group内只会被某一个consumer消费。(比如说kafka + Spark streaming的组合,那么所有的streaming executor组成一个group,去消费某个topic。)
  • kafka提供的consumer负载均衡算法:
    • topic的所有消息分区Pt  -->  需要对Pt进行排序,从而使分布在同一个broker server的分区尽量靠在一起
    • 同一个group中的所有消费者Cg  -->  对Cg进行排序。
    • N = size(Pt) / size(Cg)
    • 将编号为 i * N ~ (i + 1) * N - 1的消息分区分配给消费者Ci
    • 更新zk上消息分区与消费者Ci的关系

kafka小结

  • kafka从设计之初就是一个大规模分布式消息中间件,其server端存在多个broker,同时为了达到负载均衡,将每个topic的消息分成了多个partition,并分布在不同broker上。多个消费者和生产者能够同时发生和接收消息。
  • kafka使用zk作为其分布式协调框架,能很好地将消息生产、消息存储和消息消费结合起来。借助zk,来保持包括producer、consumer和broker在内的所有组件无状态情况下,建立起producer和consumer之间的订阅关系,并实现了producer和consumer的负载均衡。

 

推荐阅读