首页 > 解决方案 > 卡夫卡 - 偏移提交和寻求

问题描述

我目前正在从具有特定偏移量的主题中获取消息。我正在使用seek()来实现它。但是当我将enable.auto.commit设置为true或使用手动同步(commitSync()/commitAsync())时,Seek() 不起作用,因为它没有从特定偏移量轮询消息,而是从最后一个承诺的偏移量。

因此,当使用Seek()时,是否必须将偏移量存储在外部数据库中而不提交给 Kafka ?Seek 和 Commit 不会并行工作?

客户端版本 - kafka-clients - 2.4.0

谢谢!!

标签: javaapache-kafkakafka-consumer-api

解决方案


当您提交时(自动或手动几乎没有区别),您将在代理端存储消费者在分区中到达多远的记录。这个提交的偏移量只在重新平衡的情况下使用,因此当消费者被分配到该分区时,他们可以从已知所有先前消息都已处理的点处获取。这提供了一个保证,只要消费者被正确编码,当消息被顺序处理时,如果组成员发生变化,消息就不会在消费时丢失。

当组成员身份稳定时,提交的偏移量什么也不做。每个消费者都有自己的内存偏移量,它维护并在每次从代理获取一批记录时使用。默认情况下,此偏移量按顺序增加。seek 方法仅更改此内存中的偏移量,以便下一次轮询将从您指定的任意偏移量中获取,除非它不存在,在这种情况下将引发异常。

如果您在外部存储提交偏移量,则可以在重新平衡后使用 seek 来检索外部存储的偏移量并从那里获取,但在这种情况下,您必须在 RebalanceListener 中调用 seek - 如果您在 poll 之前调用 seek 它将没有效果由于消费者只在轮询方法期间发现重新平衡和新分区分配,因此在轮询期间不进行干预,它将从最后提交的偏移量开始使用。

当您暂停消费者时,也会出现这种稍微不直观的情况,这是我在https://chrisg23.blogspot.com/2020/02/why-is-pausing-kafka-consumer-so.html?m=1上写的


推荐阅读