首页 > 解决方案 > 读取事务写入的最后一个 Kafka 分区记录

问题描述

我正在尝试读取主题分区的最后一条记录。该主题的生产者正在事务性地写作。消费者设置为isolation.level=read_committed。我手动管理偏移量。这是我的消费者代码:

// Fetch the end offset of a partition
Map<TopicPartition, Long> endOffsets = consumer.endOffsets(Collections.singleton(topicPartition));
Long endOffset = endOffsets.get(topicPartition);

// Try to read the last record
if (endOffset > 0) {
    consumer.seek(topicPartition, Math.max(0, endOffset - 5));
    List records = consumer.poll(10 * 1000).records(topicPartition);

    if (records.isEmpty()) {
        throw new IllegalStateException("How can this be?");
    } else {
        // Return the last record
        return records.get(records.size() - 1);
    }
}

所以要读取最后一条记录,我要求结束偏移量,然后寻找endOffset - 5(因为 Kafka 在完全一次模式下跳过偏移量,就像我在这个问题中看到的那样:Kafka Streams does not increase offset by 1 when production to topic) ,并开始轮询。

它以前一直运行良好,但现在我有一个异常告诉我轮询零记录。这可能是什么原因?我无法复制它,我想我迷路了。

标签: apache-kafka

解决方案


推荐阅读