首页 > 解决方案 > 来自 kafka 的火花流:numInputRows 与微批处理 df.count() 不匹配

问题描述

Databricks UI 显示以下数字:

  "batchId" : 2,
  "numInputRows" : 311780,
  "inputRowsPerSecond" : 9306.587863048864,
  "processedRowsPerSecond" : 1168.240407673861,

但是当我在 foreachBatch 中打印 microatch 的 df.count() 时,它会显示不同的数字。我正在做的一个简化版本是这样的:

def handleMicroBatch(microBatchDF: org.apache.spark.sql.Dataset[Row], batchId: Long) = {      
            val batchSize = microBatchDF.count                
            println(s"batchSize: ${batchSize}, batchId: ${batchId}")
            //...more code...
}
//output looks like:
batchSize: 995, batchId: 0
batchSize: 22702, batchId: 1
batchSize: 155890, batchId: 2

如您所见,计数与批次 id#2 不匹配。我的理解是每个 kafka 偏移量都是一个 Row,因此特定批次 id 的 numInputRows 应该与我使用 spark.readSram 读取的内容相匹配。为什么数字不匹配?

标签: apache-sparkdatabricksspark-structured-streamingspark-streaming-kafka

解决方案


推荐阅读