首页 > 技术文章 > Kafka 消费者

lenoblog 2020-11-25 14:04 原文

消费者和消费组

在Kafka中,每个消费者都有一个对应的消费组。当消息发布到主题后,只会被投递给订阅它的每个消费组中的一个消费者。每个消费者只能消费所分配到的分区中的消息。

而每一个分区只能被一个消费组中的一个消费者所消费。

入上图所示,我们可以设置两个消费者组来实现广播消息的作用,消费组A和组B都可以接受到生产者发送过来的消息。

消费者与消费组这种模型可以让整体的消费能力具备横向伸缩性,我们可以增加(或减少)消费者的个数来提高(或降低)整体的消费能力。

对于分区数固定的情况,一味地增加消费者并不会让消费能力一直得到提升,如果消费者过多,出现了消费者的个数大于分区个数的情况,

就会有消费者分配不到任何分区。

如下:一共有8个消费者,7个分区,那么最后的消费者C7由于分配不到任何分区而无法消费任何消息。

对于消息中间件而言,一般有两种消息投递模式:点对点模式和发布/订阅模式。

点对点模式:基于队列的,消息生产者发送消息到队列,消息消费才从队列中接收消息。

发布/订阅模式:定义了如何向一个内容节点发布和订立消息,这个内部节点称为主题,主题可以是消息传递的中介。

如果所有消费者都隶属于同一个消费者,则所有消费都会被均衡地投递给每一个消费者,即每条消息只会被一个消费者处理,这相当于点对点模式的应用;

如果所有的消息费都隶属不同的消费组,则所有的消费都会被广播给所有的消费者,即每条消息会被所有的消费者处理,这相当于发布/订阅的应用;

消费端分区分配策略

Kafka 提供了消费者客户端参数 partition.assignment.strategy 来设置消费者与订阅主题之间的分区分配策略。

RangeAssignor分配策略
默认情况下,采用 RangeAssignor 分配策略。

RangeAssignor 分配策略的原理是按照消费者总数和分区总数进行整除运算来获得一个跨度,然后将分区按照跨度进行平均分配,以保证分区尽可能均匀地分配给所有的消费者。

对于每一个主题,RangeAssignor 策略会将消费组内所有订阅这个主题的消费者按照名称的字典序排序,然后为每个消费者划分固定的分区范围,

如果不够平均分配,那么字典序靠前的消费者会被多分配一个分区。

假设消费组内有2个消费者 C0 和 C1,都订阅了主题 t0 和 t1,并且每个主题都有4个分区,那么订阅的所有分区可以标识为:t0p0、t0p1、t0p2、t0p3、t1p0、t1p1、t1p2、t1p3。

最终的分配结果为:

消费者C0:t0p0、t0p1、t1p0、t1p1
消费者C1:t0p2、t0p3、t1p2、t1p3

假设上面例子中2个主题都只有3个分区,那么订阅的所有分区可以标识为:t0p0、t0p1、t0p2、t1p0、t1p1、t1p2。最终的分配结果为:

消费者C0:t0p0、t0p1、t1p0、t1p1
消费者C1:t0p2、t1p2

可以明显地看到这样的分配并不均匀。

RoundRobinAssignor分配策略
RoundRobinAssignor 分配策略的原理是将消费组内所有消费者及消费者订阅的所有主题的分区按照字典序排序,然后通过轮询方式逐个将分区依次分配给每个消费者。

如果同一个消费组内所有的消费者的订阅信息都是相同的,那么 RoundRobinAssignor 分配策略的分区分配会是均匀的。

如果同一个消费组内的消费者订阅的信息是不相同的,那么在执行分区分配的时候就不是完全的轮询分配,有可能导致分区分配得不均匀。

假设消费组内有3个消费者(C0、C1 和 C2),t0、t0、t1、t2主题分别有1、2、3个分区,即整个消费组订阅了 t0p0、t1p0、t1p1、t2p0、t2p1、t2p2 这6个分区。

具体而言,消费者 C0 订阅的是主题 t0,消费者 C1 订阅的是主题 t0 和 t1,消费者 C2 订阅的是主题 t0、t1 和 t2,那么最终的分配结果为:

消费者C0:t0p0
消费者C1:t1p0
消费者C2:t1p1、t2p0、t2p1、t2p2

可以看 到 RoundRobinAssignor 策略也不是十分完美,这样分配其实并不是最优解,因为完全可以将分区 t1p1 分配给消费者 C1。

StickyAssignor分配策略
这种分配策略,它主要有两个目的:

  1. 分区的分配要尽可能均匀。
  2. 分区的分配尽可能与上次分配的保持相同。

假设消费组内有3个消费者(C0、C1 和 C2),它们都订阅了4个主题(t0、t1、t2、t3),并且每个主题有2个分区。

也就是说,整个消费组订阅了 t0p0、t0p1、t1p0、t1p1、t2p0、t2p1、t3p0、t3p1 这8个分区。最终的分配结果如下:

消费者C0:t0p0、t1p1、t3p0
消费者C1:t0p1、t2p0、t3p1
消费者C2:t1p0、t2p1

再假设此时消费者 C1 脱离了消费组,那么分配结果为:

消费者C0:t0p0、t1p1、t3p0、t2p0
消费者C2:t1p0、t2p1、t0p1、t3p1

StickyAssignor 分配策略如同其名称中的“sticky”一样,让分配策略具备一定的“黏性”,尽可能地让前后两次分配相同,进而减少系统资源的损耗及其他异常情况的发生。

再均衡(Rebalance)

再均衡是指分区的所属权从一个消费者转移到另一消费者的行为,它为消费组具备高可用性和伸缩性提供保障,使我们可以既方便又安全地删除消费组内的消费者或往消费组内添加消费者。

弊端:

  1. 在再均衡发生期间,消费组内的消费者是无法读取消息的。
  2. Rebalance 很慢。如果一个消费者组里面有几百个 Consumer 实例,Rebalance 一次要几个小时。
  3. 在进行再均衡的时候消,费者当前的状态也会丢失。比如消费者消费完某个分区中的一部分消息时还没有来得及提交消费位移就发生了再均衡操作,

            之后这个分区又被分配给了消费组内的另一个消费者,原来被消费完的那部分消息又被重新消费一遍,也就是发生了重复消费。

Rebalance 发生的时机有三个:

  1. 组成员数量发生变化
  2. 订阅主题数量发生变化
  3. 订阅主题的分区数发生变化

后两类通常是业务的变动调整所导致的,我们一般不可控制,我们主要说说因为组成员数量变化而引发的 Rebalance 该如何避免。

当 Consumer Group 完成 Rebalance 之后,每个 Consumer 实例都会定期地向 Coordinator 发送心跳请求,表明它还存活着。

如果某个 Consumer 实例不能及时地发送这些心跳请求,Coordinator 就会认为该 Consumer 已经“死”了,从而将其从 Group 中移除,然后开启新一轮 Rebalance。

Consumer端可以设置session.timeout.ms,默认是10s,表示如果 Coordinator 在 10 秒之内没有收到 Group 下某 Consumer 实例的心跳,它就会认为这个 Consumer 实例已经挂了。

Consumer端还可以设置heartbeat.interval.ms,表示发送心跳请求的频率。

以及max.poll.interval.ms 参数,它限定了 Consumer 端应用程序两次调用 poll 方法的最大时间间隔。

它的默认值是 5 分钟,表示你的 Consumer 程序如果在 5 分钟之内无法消费完 poll 方法返回的消息,那么 Consumer 会主动发起“离开组”的请求,Coordinator 也会开启新一轮 Rebalance。

所以知道了上面几个参数后,我们就可以避免以下两个问题:

  1. 非必要 Rebalance 是因为未能及时发送心跳,导致 Consumer 被“踢出”Group 而引发的。
    所以我们在生产环境中可以这么设置:
    • 设置 session.timeout.ms = 6s。
    • 设置 heartbeat.interval.ms = 2s。
  2. 必要 Rebalance 是 Consumer 消费时间过长导致的。如何消费任务时间达到8分钟,而max.poll.interval.ms设置为5分钟,那么也会发生Rebalance,所以如果有比较重的任务的话,可以适当调整这个参数。
  3. Consumer 端的频繁的 Full GC导致的长时间停顿,从而引发了 Rebalance。

消费者组再平衡全流程

重平衡过程是靠消费者端的心跳线程(Heartbeat Thread),通知到其他消费者实例的。

当协调者决定开启新一轮重平衡后,它会将“REBALANCE_IN_PROGRESS”封装进心跳请求的响应中,发还给消费者实例。

当消费者实例发现心跳响应中包含了“REBALANCE_IN_PROGRESS”,就能立马知道重平衡又开始了,这就是重平衡的通知机制。

所以,实际上heartbeat.interval.ms不止是设置了心跳的间隔时间,还可以控制重平衡通知的频率。

消费者组状态机

重平衡一旦开启,Broker 端的协调者组件就要完成整个重平衡流程,Kafka 设计了一套消费者组状态机(State Machine)来实现。

Kafka 为消费者组定义了 5 种状态,它们分别是:Empty、Dead、PreparingRebalance、CompletingRebalance 和 Stable。

状态机的各个状态流转:

当有新成员加入或已有成员退出时,消费者组的状态从 Stable 直接跳到 PreparingRebalance 状态,此时,所有现存成员就必须重新申请加入组。

当所有成员都退出组后,消费者组状态变更为 Empty。Kafka 定期自动删除过期位移的条件就是,组要处于 Empty 状态。

因此,如果你的消费者组停掉了很长时间(超过 7 天),那么 Kafka 很可能就把该组的位移数据删除了。

组协调器(GroupCoordinator)

GroupCoordinator 是 Kafka 服务端中用于管理消费组的组件。协调器最重要的职责就是负责执行消费者再均衡的操作。

消费者端重平衡流程

在消费者端,重平衡分为两个步骤:分别是加入组和等待领导者消费者(Leader Consumer)分配方案。即JoinGroup 请求和 SyncGroup 请求。

  1. 加入组
    当组内成员加入组时,它会向协调器发送 JoinGroup 请求。在该请求中,每个成员都要将自己订阅的主题上报,这样协调器就能收集到所有成员的订阅信息。

  2. 选择消费组领导者
    一旦收集了全部成员的 JoinGroup 请求后,协调者会从这些成员中选择一个担任这个消费者组的领导者。
    这里的领导者是具体的消费者实例,它既不是副本,也不是协调器。领导者消费者的任务是收集所有成员的订阅信息,然后根据这些信息,制定具体的分区消费分配方案。

  3. 选举分区分配策略
    这个分区分配的选举是根据消费组内的各个消费者投票来决定的。
    协调器会收集各个消费者支持的所有分配策略,组成候选集 candidates。每个消费者从候选集 candidates 中找出第一个自身支持的策略,为这个策略投上一票。

  4. 计算候选集中各个策略的选票数,选票数最多的策略即为当前消费组的分配策略。

     如果有消费者并不支持选出的分配策略,那么就会报出异常 IllegalArgumentException:Member does not support protocol。

5.发送 SyncGroup 请求

协调器会把消费者组订阅信息封装进 JoinGroup 请求的响应体中,然后发给领导者,由领导者统一做出分配方案,然后领导者发送 SyncGroup 请求给协调器。

6.响应SyncGroup
组内所有的消费者都会发送一个 SyncGroup 请求,只不过不是领导者的请求内容为空,然后就会接收到一个SyncGroup响应,接受订阅信息。

  

ref : https://www.cnblogs.com/luozhiyun/p/12443889.html

推荐阅读