java - kafka消费者endOffsets的一致性
问题描述
我有一个复制因子为 3 的 kafka 主题,以及min.insync.replicas = 2
一个将 X 消息发送到该主题的生产者acks=all
。在所有发送到主题的消息之后(1 分钟内),使用 java kafka 客户端为此主题创建新的消费者。consumer.endOffsets()
获取该主题的所有 kafka 分区的方法结束偏移量。同一方法的另一次调用consumer.endOffsets
有时会为某些分区返回不同的结束偏移量。
在此设置中,在创建消费者后没有新消息发送到 kafka 主题。
根据java文档endOffsets
:
/**
* Get the last offset for the given partitions. The last offset of a partition is the offset of the upcoming
* message, i.e. the offset of the last available message + 1. If messages have never been written
* to the the partition, the offset returned will be 0.
*
* <p>
* This method does not change the current consumer position of the partitions.
* <p>
* When {@code isolation.level=read_committed} the last offset will be the Last Stable Offset (LSO).
* This is the offset of the first message with an open transaction. The LSO moves forward as transactions
* are completed.
*
* @see #seekToEnd(Collection)
*
* @param partitions the partitions to get the end offsets.
* @return The end offsets for the given partitions.
* @throws org.apache.kafka.common.errors.AuthenticationException if authentication fails. See the exception for more details
* @throws org.apache.kafka.common.errors.AuthorizationException if not authorized to the topic(s). See the exception for more details
* @throws org.apache.kafka.common.errors.TimeoutException if the offsets could not be fetched before
* expiration of the configured {@code request.timeout.ms}
*/
@Override
public Map<TopicPartition, Long> endOffsets(Collection<TopicPartition> partitions)
endOffsets 返回所有副本确认的最后稳定偏移量 (LSO)。
为什么有时(不是经常)结束偏移量在随后调用此方法之间发生变化?endOffsets 最终一致是预期的行为吗?虫子?
解决方案
推荐阅读
- amazon-web-services - 当 API Gateway 返回错误 500 时发送 Slack 通知
- angular - HashLocationStrategy 在 Angular 7 中不起作用
- ios - 将字典数组转换为字典数组
- amazon-web-services - 跨账户 SNS 在第二个账户订阅 Lambda
- excel - 我的页面加载缓慢。代码需要更快有什么建议吗?
- asp.net-core - 启动 ASP.NET 3.0 应用程序时出现 HTTP 500 错误
- r - ggplot2 绘制两条线之间的角度
- java - JavaFx TableView,无法将数据从mysql数据库检索到表
- mysql - 在MYSQL的父表中存储子级数量的最佳方法
- node.js - $out ,$projection 在 mongodb