首页 > 解决方案 > 如何正确使用 Kafka 消费者“寻找”以返回所有分区的未提交偏移量?

问题描述

使用 Java Kafka Consumerseek()函数,它需要我们传入TopicPartionand Offest。但是,我认为这个 seek 方法会为我的消费者获取订阅的 TopicPartitions 集合。

这是我要处理的示例。

消费者 A 订阅了主题“test-topic”分区 1 和 2。当我调用poll(). 我处理了一些消息,但我的应用程序出现异常。我不叫commitSync()。现在我想回到我上次检索到的那些偏移量,poll()并尝试重新处理它们。那么我该怎么做呢?我是否需要检查每个主题分区的最后提交的偏移量并调用seek()每个分区?seek()多次调用是否只接受最后一次调用seek()?正如我所说,我想确保我的消费者返回所有分区,这样我就不会丢失任何已分配分区上的任何数据。

标签: apache-kafka

解决方案


我处理了一些消息,但我的应用程序出现异常。我不调用 commitSync()

如果您不调用commitSync(),则不会提交消息。如果假设异常杀死了您的程序,那么在重新启动后,消费者通常会从最后提交的偏移量中读取它。

您可能还需要检查auto.offset.reset并将其设置为earliest.

检查您的消息是否自动提交,因为您commitSync()不需要自动提交,即enable.auto.commit可以设置为false(默认情况下它true在 Confluent Kafka 中)

如果您的程序没有被异常终止,您始终拥有已使用的记录。您可以重试处理每条记录,然后提交。

ConsumerRecords records = consumer.poll(Duration.ofSeconds(10));
for(ConsumerRecord record: records)
{
   tryProcess(record, 3);
}
consumer.commitSync();

void tryProcess(ConsumerRecord record, int maxRetries) {
    if(maxRetries < 1) {
        log.warn("max retries exhausted for record");
        return;
    }
    try {
         process(record);
    } catch(Exception ex){ 
        tryProcess(record, --maxRetries);
    }
}

您还可以尝试通过重试来处理批量记录,而不是像tryProcess(records, 3)记录对应的每条记录那样ConsumerRecords重试 3 次。我不认为有寻求的必要。


不过,我仍然对 seek() api 的用法感到好奇

seek()例如,当我们不使用订阅时,可能会使用它,consumer.subscribe()而是consumer.assign()当我们只想查看(查看)主题中的消息时通常会这样做,例如控制台消费者。有时,我们可能需要在某个偏移量之后查看一些消息,或者最后n条消息等,而实际上并没有对它们做任何事情,而只是显示。


推荐阅读