apache-kafka - Kafka 控制台消费者在使用 --max-messages 时提交了错误的偏移量
问题描述
我在 1.1.0 版中有一个 kafka 控制台使用者,我用它来从 Kafka 获取消息。当我使用带有选项 --max-messages 的 kafka-console-consumer.sh 脚本时,它似乎提交了错误的偏移量。
我创建了一个主题和一个消费者组并阅读了一些消息:
/kafka_2.11-1.1.0/bin/kafka-consumer-groups.sh --bootstrap-server 192.168.1.23:9092 --describe --group my-consumer-group
TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
test.offset 1 374 374 0 - - -
test.offset 0 0 375 375 - - -
比我读了 10 条这样的消息:
/kafka_2.11-1.1.0/bin/kafka-console-consumer.sh --bootstrap-server 192.168.1.23:9092 --topic test.offset --timeout-ms 1000 --max-messages 10 --consumer.config /kafka_2.11-1.1.0/config/consumer.properties
1 var_1
3 var_3
5 var_5
7 var_7
9 var_9
11 var_11
13 var_13
15 var_15
17 var_17
19 var_19
Processed a total of 10 messages
但是现在偏移显示它读取了一个主题中的所有消息
/kafka_2.11-1.1.0/bin/kafka-consumer-groups.sh --bootstrap-server 192.168.1.23:9092 --describe --group my-consumer-group
Note: This will not show information about old Zookeeper-based consumers.
Consumer group 'my-consumer-group' has no active members.
TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
test.offset 1 374 374 0 - - -
test.offset 0 375 375 0 - - -
现在,当我想阅读更多消息时,我收到一个主题中没有更多消息的错误:
/kafka_2.11-1.1.0/bin/kafka-console-consumer.sh --bootstrap-server 192.168.1.23:9092 --topic test.offset --timeout-ms 1000 --max-messages 10 --consumer.config /kafka_2.11-1.1.0/config/consumer.properties
[2020-02-28 08:27:54,782] ERROR Error processing message, terminating consumer process: (kafka.tools.ConsoleConsumer$)
kafka.consumer.ConsumerTimeoutException
at kafka.consumer.NewShinyConsumer.receive(BaseConsumer.scala:98)
at kafka.tools.ConsoleConsumer$.process(ConsoleConsumer.scala:129)
at kafka.tools.ConsoleConsumer$.run(ConsoleConsumer.scala:84)
at kafka.tools.ConsoleConsumer$.main(ConsoleConsumer.scala:54)
at kafka.tools.ConsoleConsumer.main(ConsoleConsumer.scala)
Processed a total of 0 messages
我做错了什么?为什么偏移量移动到主题中的最后一条消息而不仅仅是 10 条消息?
解决方案
这是关于 Kafka 消费者的自动提交功能。如本链接所述:
提交偏移量的最简单方法是让消费者为您完成。如果您配置 enable.auto.commit=true,那么每五秒消费者将提交您的客户端从 poll() 收到的最大偏移量。5 秒间隔是默认值,由设置 auto.commit.interval.ms 控制。就像消费者中的其他一切一样,自动提交是由轮询循环驱动的。每当您进行轮询时,消费者都会检查是否到了提交的时间,如果是,它将提交它在上次轮询中返回的偏移量。
因此,在您的消费者轮询的情况下,它会接收最多 500 条消息(默认值max.poll.records
),并且在 5 秒后,即使您将 max-messages 指定为 10,它也会提交从上次轮询返回的最大偏移量(在您的情况下为 375)。
--max-messages: 退出前要消耗的最大消息数。如果未设置,则消耗是连续的。
推荐阅读
- html - 文本溢出不适用于响应式菜单中的链接
- redux - React redux 存储状态管理
- json - 字符串插值中的 jq 过滤器表达式
- yaml - Prometheus yaml 文件:未找到预期的密钥
- jupyter-notebook - 如何在 Jupyter Notebook 中抑制不需要的输出
- javascript - Electron 与 Vue,如何在将图像保存到用户数据文件夹后同步显示图像
- ruby-on-rails - 使用不同数量的产品创建订单
- java - 基于 Apache Solr 的 Spring Boot 应用搜索功能
- swift - Swift UIImage:循环中的内存生命周期
- graphdb - GraphDB 免费版与标准版中的并行写入操作