首页 > 解决方案 > kafka消费者endOffsets的一致性

问题描述

我有一个复制因子为 3 的 kafka 主题,以及min.insync.replicas = 2一个将 X 消息发送到该主题的生产者acks=all。在所有发送到主题的消息之后(1 分钟内),使用 java kafka 客户端为此主题创建新的消费者。consumer.endOffsets()获取该主题的所有 kafka 分区的方法结束偏移量。同一方法的另一次调用consumer.endOffsets有时会为某些分区返回不同的结束偏移量。

在此设置中,在创建消费者后没有新消息发送到 kafka 主题。

根据java文档endOffsets

/**
     * Get the last offset for the given partitions.  The last offset of a partition is the offset of the upcoming
     * message, i.e. the offset of the last available message + 1.  If messages have never been written
     * to the the partition, the offset returned will be 0.
     *
     * <p>
     * This method does not change the current consumer position of the partitions.
     * <p>
     * When {@code isolation.level=read_committed} the last offset will be the Last Stable Offset (LSO).
     * This is the offset of the first message with an open transaction. The LSO moves forward as transactions
     * are completed.
     *
     * @see #seekToEnd(Collection)
     *
     * @param partitions the partitions to get the end offsets.
     * @return The end offsets for the given partitions.
     * @throws org.apache.kafka.common.errors.AuthenticationException if authentication fails. See the exception for more details
     * @throws org.apache.kafka.common.errors.AuthorizationException if not authorized to the topic(s). See the exception for more details
     * @throws org.apache.kafka.common.errors.TimeoutException if the offsets could not be fetched before
     *         expiration of the configured {@code request.timeout.ms}
     */
    @Override
    public Map<TopicPartition, Long> endOffsets(Collection<TopicPartition> partitions) 

endOffsets 返回所有副本确认的最后稳定偏移量 (LSO)。

为什么有时(不是经常)结束偏移量在随后调用此方法之间发生变化?endOffsets 最终一致是预期的行为吗?虫子?

标签: javaapache-kafkakafka-consumer-api

解决方案


推荐阅读