java - ReplicaFetcher 崩溃 - 第一个偏移量 XXXXXXX 小于下一个偏移量 YYYYYYYY
问题描述
我们有一个由 3 个盒子组成的 2.3 Kafka 集群。几天前,当我们确实将其升级到 2.3 时,我们注意到那些使两个代理上的一个主题分区的 replicaFetcher 线程崩溃的日志消息:
[2019-08-09 15:02:43,520] ERROR [ReplicaFetcher replicaId=4, leaderId=3, fetcherId=0] Unexpected error occurred while processing data for partition __consumer_offsets-21 at offset 57542337 (kafka.server.R
eplicaFetcherThread)
kafka.common.UnexpectedAppendOffsetException: Unexpected offset in append to __consumer_offsets-21. First offset 57542333 is less than the next offset 57542337. First 10 offsets in append: List(57542333,
57542334, 57542335, 57542336, 57542337, 57542338, 57542339, 57542340, 57542341, 57542342), last offset in append: 57570869. Log start offset = 56949140
at kafka.log.Log.$anonfun$append$2(Log.scala:929)
at kafka.log.Log.maybeHandleIOException(Log.scala:2065)
at kafka.log.Log.append(Log.scala:850)
at kafka.log.Log.appendAsFollower(Log.scala:830)
at kafka.cluster.Partition.$anonfun$doAppendRecordsToFollowerOrFutureReplica$1(Partition.scala:726)
at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:253)
at kafka.utils.CoreUtils$.inReadLock(CoreUtils.scala:259)
at kafka.cluster.Partition.doAppendRecordsToFollowerOrFutureReplica(Partition.scala:717)
at kafka.cluster.Partition.appendRecordsToFollowerOrFutureReplica(Partition.scala:733)
at kafka.server.ReplicaFetcherThread.processPartitionData(ReplicaFetcherThread.scala:161)
at kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$7(AbstractFetcherThread.scala:317)
at scala.Option.foreach(Option.scala:274)
at kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$6(AbstractFetcherThread.scala:306)
at kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$6$adapted(AbstractFetcherThread.scala:305)
at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
at kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$5(AbstractFetcherThread.scala:305)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:253)
at kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:305)
at kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:133)
at kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3$adapted(AbstractFetcherThread.scala:132)
at scala.Option.foreach(Option.scala:274)
at kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:132)
at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:114)
at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:89)
[2019-08-09 15:02:43,524] WARN [ReplicaFetcher replicaId=4, leaderId=3, fetcherId=0] Partition __consumer_offsets-21 marked as failed (kafka.server.ReplicaFetcherThread)
影响是一个代理不能成为这个主题分区的 ISR(实际上第二个代理也有同样的问题,所以我们只有一个 ISR 是领导者)。
我仍然对此消息感到困惑,并且无法正确理解它,因此我无法找到解决此问题的正确方法。我真的很想了解这里发生了什么,但不确定我是否理解下面的代码:
if (appendInfo.firstOrLastOffsetOfFirstBatch < nextOffsetMetadata.messageOffset) {
当必须追加记录时,replicaFetcher 如何访问 nextOffset 信息..?不确定了解此分析的确切作用(要附加的当前记录?):
val appendInfo = analyzeAndValidateRecords(records, isFromClient = isFromClient)
和这个 :
nextOffsetMetadata
这是下一批记录..?它如何访问任何“下一个”记录元数据?
如果有人可以澄清这一点,那就太好了。与此同时,摆脱这个问题的解决方案会很好,但我仍然希望清楚地理解它。
编辑:
经过一番研究,有些事情变得更清楚了。nextOffset 只是活动段的最新偏移+1(这些元数据来自 loadSegments() 调用)。
综上所述,这里发生了什么:副本从领导者那里获取段,其开始偏移量低于活动段的最新偏移量。所以我的问题是,为什么副本不只是截断?
雅尼克
解决方案
我们的一个 kafka 集群正在运行 kafka 1.1.1,并且遇到了同样的问题,使用类固醇。在我们的例子中,ReplicaFetcher 崩溃了,并且完全停止了复制。解决方案是删除受感染的分区并让 kafka 从健康的副本重新创建。我们试图修复分区文件夹中不一致的条目,但没有成功。
推荐阅读
- r - 使用 fitdistrplus 中的 fitdist 和不同大小的 beta 二项分布
- opencv - opencv安装问题
- variables - 如何为 a 创建随机分布的布尔变量
那会改变模型吗? - python - SQLAlchemy:如果只使用核心而不是 ORM,我可以获得与 RowProxy 不同的类吗?
- sqlite - SQLite 插入到具有自动增量的一列的表中
- python - NetworkX 绘图:节点位置和大小之间的不同单位/比例?
- sharepoint - 无法打开 .cab 文件
- razor - 用于验证消息的自定义 HTML 助手
- python - WebDriverWait 通过 CSS 选择器查找元素
- javascript - Google API 请求收到错误结果