首页 > 解决方案 > 卡夫卡消费者错误:标记协调员死亡

问题描述

我在 Kafka 0.10.0.1 集群中有一个包含 10 个分区的主题。我有一个产生多个消费者线程的应用程序。对于这个主题,我将产生 5 个线程。在我的应用程序日志中多次看到此条目

INFO :: AbstractCoordinator:600 - Marking the coordinator x.x.x.x:9092
(id:2147483646 rack: null) dead for group notifications-consumer

然后有几个条目说(Re-)joining group notifications-consumer. 之后我也看到一个警告说

Auto commit failed for group notifications-consumer: Commit cannot be completed since
the group has already rebalanced and assigned the partitions to another member. This means
that the time between subsequent calls to poll() was longer than the configured
max.poll.interval.ms, which typically implies that the poll loop is spending too much time 
message processing. You can address this either by increasing the session timeout
or by reducing the maximum size of batches returned by poll() with max.poll.records.

现在我已经像这样调整了我的消费者配置

props.put("max.poll.records", 200);
props.put("heartbeat.interval.ms", 20000);
props.put("session.timeout.ms", 60000);

因此,即使在正确调整配置后,我仍然会收到此错误。在重新平衡期间,我们的应用程序完全没有响应。请帮忙。

标签: apache-kafka

解决方案


由于session.timeout.ms您仅控制由于心跳引起的超时,这意味着session.timeout.ms自上次心跳以来已经过去了几毫秒,并且集群将您声明为死节点并触发重新平衡。

KIP-62之前,心跳是在轮询中发送的,但现在被移动到特定的后台线程,以避免在您花费更多时间而不是session.timeout.ms调用另一个时被驱逐出集群poll()。将心跳分离到特定线程可以将处理与告诉集群您已启动并正在运行分离,但这会引入进程处于活动状态但没有取得进展的“活锁”情况的风险,因此除了使心跳独立引入了poll一个新的超时,以确保消费者还活着并取得进展。文档说明了 KIP-62 之前的实现:

只要消费者在发送心跳,它基本上就锁定了分配给它的分区。如果进程以这样的方式失效,它无法取得进展,但仍继续发送心跳,则组中的其他成员将无法接管分区,这会导致延迟增加。然而,心跳和处理都在同一个线程中完成的事实保证了消费者必须取得进展才能保持他们的分配。任何影响处理的停顿也会影响心跳。

KIP-62 引入的变化包括:

解耦处理超时:我们建议为记录处理引入一个单独的本地强制超时和一个后台线程以保持会话处于活动状态,直到该超时到期。我们将此新的超时称为“进程超时”,并将其在消费者的配置中公开为 max.poll.interval.ms。此配置设置客户端调用 poll() 之间的最大延迟

从您发布的日志中,我认为您可能处于这种情况下,您的应用程序花费的时间超过max.poll.interval.ms(默认为 5 分钟)来处理 200 条轮询记录。如果您在这种情况下,您只能减少更多max.poll.records或增加max.poll.interval.ms

PD:

您的日志中显示的max.poll.interval.ms配置来自(至少)kafka 0.10.1.0,所以我假设您在那里犯了一个小错误。

更新

如果我理解错了,请纠正我,但在您最后的评论中,您说您正在创建 25 个消费者(例如org.apache.kafka.clients.consumer.KafkaConsumer,如果您使用 java,则为 25 个)并将它们订阅到 N 个不同的主题但使用相同的group.id. 如果这是正确的,您将在每次KafkaConsumer启动或停止 a 时看到重新平衡,因为它会发送一个JoinGrouporLeaveGroup消息(请参阅相应的kafka 协议),其中包含group.idmember.idmember.id不是主机,因此在同一进程中创建的两个消费者仍然有不同的ID)。请注意,这些消息不包含主题订阅信息(尽管该信息应该在代理中,但 kafka 不使用它进行重新平衡)。所以每次集群收到一个JoinGroup或一个LeaveGroup对于group.idX,它将触发所有具有相同group.id X的消费者的重新平衡。

如果您以相同的方式启动 25 个消费者,group.id您将看到重新平衡,直到创建最后一个消费者并且相应的重新平衡结束(如果您继续看到这种情况,您可能会停止消费者)。

几个月前我遇到了这个问题。

如果我们有两个 KafkaConsumer 使用相同的 group.id(在同一个进程或两个不同的进程中运行)并且其中一个已关闭,即使它们订阅了不同的主题,它也会触发另一个 KafkaConsumer 的重新平衡。我想经纪人必须只考虑 group.id 进行再平衡,而不是与 LeaveGroupRequest 的对 (group_id,member_id) 对应的订阅主题,但我想知道这是预期的行为还是应该得到改善?我想这可能是避免在代理中进行更复杂的重新平衡的第一个选择,并且考虑到解决方案非常简单,即只需为订阅不同主题的不同 KafkaConsumer 使用不同的组 ID,即使它们在同一进程中运行。


当重新平衡发生时,我们会看到重复的消息

这是预期的行为,一个消费者消费了消息,但在提交偏移量之前触发了重新平衡并且提交失败。当重新平衡完成时,将分配该主题的进程将再次使用该消息(直到提交成功)。

我分成两组,现在突然问题在过去 2 小时内消失了。

您在这里一针见血,但如果您不想看到任何(可避免的)重新平衡,您应该group.id为每个主题使用不同的。

这是关于不同再平衡方案的精彩讨论。


推荐阅读