apache-spark - 来自 kafka 的火花流:numInputRows 与微批处理 df.count() 不匹配
问题描述
Databricks UI 显示以下数字:
"batchId" : 2,
"numInputRows" : 311780,
"inputRowsPerSecond" : 9306.587863048864,
"processedRowsPerSecond" : 1168.240407673861,
但是当我在 foreachBatch 中打印 microatch 的 df.count() 时,它会显示不同的数字。我正在做的一个简化版本是这样的:
def handleMicroBatch(microBatchDF: org.apache.spark.sql.Dataset[Row], batchId: Long) = {
val batchSize = microBatchDF.count
println(s"batchSize: ${batchSize}, batchId: ${batchId}")
//...more code...
}
//output looks like:
batchSize: 995, batchId: 0
batchSize: 22702, batchId: 1
batchSize: 155890, batchId: 2
如您所见,计数与批次 id#2 不匹配。我的理解是每个 kafka 偏移量都是一个 Row,因此特定批次 id 的 numInputRows 应该与我使用 spark.readSram 读取的内容相匹配。为什么数字不匹配?
解决方案
推荐阅读
- sql - 如何通过将下一行的值与前一个值进行比较来找到下一行的值?
- javascript - 重新渲染 MathJax 文本?
- docker - 在 Kubernetes 中创建部署时引用 Dockerfile 的正确方法是什么?
- javascript - 创建 IDP SAML 响应
- java - 在java中使用new关键字而不定义变量
- sql - 具有超过 2 个结果列的透视/取消透视 oracle sql 查询
- python - 一个pandas列的累积总和,直到达到最大值,平均相邻行
- bash - shell 脚本中需要一元运算符
- javascript - 等待异步动作创建者更新另一个动作创建者内部的状态
- keras - 我可以为 pytorch/keras 提供显式梯度/粗麻布来加速学习吗?