java - 强制 kafka 消费者轮询具有最高延迟的分区
问题描述
我有一个设置,其中几个KafkaConsumers
每个处理一个主题的多个分区。它们被静态分配分区,以确保每个消费者有相同数量的分区来处理。还选择了记录键,以便我们在所有分区上平均分配消息。
在负载较重的时候,我们经常看到少数分区建立了相当大的延迟(数千条消息/几分钟),而其他获得相同负载并被同一消费者消费的分区设法保持延迟减少到几百条消息/几秒钟。
看起来消费者正在尽可能快地获取记录,绕过大部分分区,但有时会有一个分区被遗漏很长时间。理想情况下,我希望看到滞后在分区中分布得更均匀。
我已经阅读KafkaConsumer
了一段时间的民意调查行为和配置,到目前为止,我认为有两个选项可以解决这个问题:
- 构建一些自定义的东西,可以监控每个分区的延迟,并使用
KafkaConsumer.pause()
并.resume()
从本质上强制KafkaConsumer
从具有最大延迟的分区中读取 - 限制我们
KafkaConsumer
只订阅一个TopicPartition
,并使用多个实例KafkaConsumer
。
这些选项似乎都不是处理此问题的正确方法。配置似乎也没有答案:
max.partition.fetch.bytes
仅指定单个分区的最大获取大小,它不保证下一次获取将来自另一个分区。max.poll.interval.ms
仅适用于消费者组,而不适用于每个分区。
我是否错过了鼓励KafkaConsumer
更频繁地切换分区的方法?或者一种实现对延迟最高的分区的偏好的方法?
解决方案
不确定答案是否仍然与您相关,或者我的答案是否完全符合您的需求,但是,您可以尝试延迟感知分配者。这个将分区分配给消费者的分配器确保消费者被分配分区,以便消费者之间的滞后被均匀/平等地分配。这是一个编写良好的代码,我使用它实现了基于延迟的分配器。
https://github.com/grantneale/kafka-lag-based-assignor
您需要做的只是配置您的消费者以使用此分配器。下面的说法。
props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, LagBasedPartitionAssignor.class.getName());
推荐阅读
- reactjs - 动态 React 组件渲染不会随着 map() 的每次新迭代而改变
- matplotlib - matplotlib 标签中的下标*不是斜体*?
- ruby - 安装 gem 后 VSCode 找不到 Ruby Gems - “运行 `gem install solargraph` 或更新您的 Gemfile。”
- html5-canvas - 如何将 p5.JS 画布创建代码转换为 HTML 画布
- django - 无法在视图中编辑 django 表单 empty_form 的字段
- qt - QT GUI 不会出现在 X11 转发上
- php - 在PHP和SQL中删除记录后删除记录输出else语句3次
- apache-spark-sql - 使用 Spark flatmap 展平嵌套数据,其中嵌套列表在 java 中以二进制形式存储
- android - Android虚拟内存,分页还是分段?
- cakephp - Netbeans 中 CakePHP 4 上的 Xdebug 说“找不到路由”