首页 > 解决方案 > 强制 kafka 消费者轮询具有最高延迟的分区

问题描述

我有一个设置,其中几个KafkaConsumers每个处理一个主题的多个分区。它们被静态分配分区,以确保每个消费者有相同数量的分区来处理。还选择了记录键,以便我们在所有分区上平均分配消息。

在负载较重的时候,我们经常看到少数分区建立了相当大的延迟(数千条消息/几分钟),而其他获得相同负载并被同一消费者消费的分区设法保持延迟减少到几百条消息/几秒钟。

看起来消费者正在尽可能快地获取记录,绕过大部分分区,但有时会有一个分区被遗漏很长时间。理想情况下,我希望看到滞后在分区中分布得更均匀。

我已经阅读KafkaConsumer了一段时间的民意调查行为和配置,到目前为止,我认为有两个选项可以解决这个问题:

  1. 构建一些自定义的东西,可以监控每个分区的延迟,并使用KafkaConsumer.pause().resume()从本质上强制KafkaConsumer从具有最大延迟的分区中读取
  2. 限制我们KafkaConsumer只订阅一个TopicPartition,并使用多个实例KafkaConsumer

这些选项似乎都不是处理此问题的正确方法。配置似乎也没有答案:

我是否错过了鼓励KafkaConsumer更频繁地切换分区的方法?或者一种实现对延迟最高的分区的偏好的方法?

标签: javascalaapache-kafkakafka-consumer-api

解决方案


不确定答案是否仍然与您相关,或者我的答案是否完全符合您的需求,但是,您可以尝试延迟感知分配者。这个将分区分配给消费者的分配器确保消费者被分配分区,以便消费者之间的滞后被均匀/平等地分配。这是一个编写良好的代码,我使用它实现了基于延迟的分配器。

https://github.com/grantneale/kafka-lag-based-assignor

您需要做的只是配置您的消费者以使用此分配器。下面的说法。

props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, LagBasedPartitionAssignor.class.getName());

推荐阅读