首页 > 解决方案 > Kafka 控制台消费者在使用 --max-messages 时提交了错误的偏移量

问题描述

我在 1.1.0 版中有一个 kafka 控制台使用者,我用它来从 Kafka 获取消息。当我使用带有选项 --max-messages 的 kafka-console-consumer.sh 脚本时,它似乎提交了错误的偏移量。

我创建了一个主题和一个消费者组并阅读了一些消息:

/kafka_2.11-1.1.0/bin/kafka-consumer-groups.sh --bootstrap-server 192.168.1.23:9092 --describe --group my-consumer-group
TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID     HOST            CLIENT-ID
test.offset     1          374             374             0               -               -               -
test.offset     0          0               375             375             -               -               -

比我读了 10 条这样的消息:

/kafka_2.11-1.1.0/bin/kafka-console-consumer.sh --bootstrap-server 192.168.1.23:9092 --topic test.offset --timeout-ms 1000 --max-messages 10 --consumer.config /kafka_2.11-1.1.0/config/consumer.properties
1 var_1
3 var_3
5 var_5
7 var_7
9 var_9
11 var_11
13 var_13
15 var_15
17 var_17
19 var_19
Processed a total of 10 messages

但是现在偏移显示它读取了一个主题中的所有消息

/kafka_2.11-1.1.0/bin/kafka-consumer-groups.sh --bootstrap-server 192.168.1.23:9092 --describe --group my-consumer-group
Note: This will not show information about old Zookeeper-based consumers.
Consumer group 'my-consumer-group' has no active members.

TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID     HOST            CLIENT-ID
test.offset     1          374             374             0               -               -               -
test.offset     0          375             375             0               -               -               -

现在,当我想阅读更多消息时,我收到一个主题中没有更多消息的错误:

/kafka_2.11-1.1.0/bin/kafka-console-consumer.sh --bootstrap-server 192.168.1.23:9092 --topic test.offset --timeout-ms 1000 --max-messages 10 --consumer.config /kafka_2.11-1.1.0/config/consumer.properties
[2020-02-28 08:27:54,782] ERROR Error processing message, terminating consumer process:  (kafka.tools.ConsoleConsumer$)
kafka.consumer.ConsumerTimeoutException
        at kafka.consumer.NewShinyConsumer.receive(BaseConsumer.scala:98)
        at kafka.tools.ConsoleConsumer$.process(ConsoleConsumer.scala:129)
        at kafka.tools.ConsoleConsumer$.run(ConsoleConsumer.scala:84)
        at kafka.tools.ConsoleConsumer$.main(ConsoleConsumer.scala:54)
        at kafka.tools.ConsoleConsumer.main(ConsoleConsumer.scala)
Processed a total of 0 messages

我做错了什么?为什么偏移量移动到主题中的最后一条消息而不仅仅是 10 条消息?

标签: apache-kafkakafka-consumer-api

解决方案


这是关于 Kafka 消费者的自动提交功能。如本链接所述:

提交偏移量的最简单方法是让消费者为您完成。如果您配置 enable.auto.commit=true,那么每五秒消费者将提交您的客户端从 poll() 收到的最大偏移量。5 秒间隔是默认值,由设置 auto.commit.interval.ms 控制。就像消费者中的其他一切一样,自动提交是由轮询循环驱动的。每当您进行轮询时,消费者都会检查是否到了提交的时间,如果是,它将提交它在上次轮询中返回的偏移量。

因此,在您的消费者轮询的情况下,它会接收最多 500 条消息(默认值max.poll.records),并且在 5 秒后,即使您将 max-messages 指定为 10,它也会提交从上次轮询返回的最大偏移量(在您的情况下为 375)。

--max-messages: 退出前要消耗的最大消息数。如果未设置,则消耗是连续的。


推荐阅读