首页 > 解决方案 > 重新消费未提交偏移量的消息

问题描述

我有一个自定义的 Kafka 消费者,用于向 REST API 发送一些请求。根据 API 的响应,我要么提交偏移量,要么在不提交的情况下跳过消息。

最小的例子:

while (true) {

    ConsumerRecords<String, Object> records = consumer.poll(200);
    for (ConsumerRecord<String, Object> record : records) {

        // Sending a POST request and retrieving the answer
        // ...

        if (responseCode.startsWith("2")) {
            try { 
               consumer.commitSync();
            } catch(CommitFailedException ex) {
              ex.printStackTrace(); 
            }
        } else {
              // Do Nothing
        }
    }
}

现在,当来自 REST API 的响应不以 a 开头时,2不会提交偏移量,但不会重新使用消息。如何强制消费者重新使用未提交偏移量的消息?

标签: javaapache-kafkakafka-consumer-api

解决方案


提交偏移量只是存储消费者当前偏移量(也称为位置)的一种方式。因此,如果它停止,它(或接管的新消费者实例)可以找到它以前的位置并从那里重新开始消费。

因此,即使您不提交,一旦您收到记录,消费者的位置也会移动。如果你想重新消费一些记录,你必须改变消费者的当前位置。

使用 Java 客户端,您可以使用seek().

在您的场景中,您可能想要计算相对于当前位置的新位置。如果是这样,您可以使用 找到当前位置position()


推荐阅读