apache-spark - Spark 结构化流式处理 Kafka 微批量计数
问题描述
我正在使用 Spark 结构化流从 Kafka 主题中读取记录;我打算计算 Spark 中每个“微批次”中收到的记录数readstream
这是一个片段:
val kafka_df = sparkSession
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host:port")
.option("subscribe", "test-count")
.load()
我从文档中了解到 kafka_df 将在 astreamingQuery
启动(接下来)时被懒惰地评估,并且在评估时,它拥有一个微批次。所以,我想先做一个主题,然后做一个应该工作groupBy
。count
像这样:
val counter = kafka_df
.groupBy("topic")
.count()
现在要评估所有这些,我们需要一个 streaminQuery,比如说,一个控制台接收器查询以在控制台上打印它。这就是我看到问题的地方。DataFrames上的流式aggregate
查询,例如kafka_df
仅适用于outputMode
complete/update而不适用于append。
这实际上意味着,streamingQuery 报告的计数是累积的。
像这样:
val counter_json = counter.toJSON //to jsonify
val count_query = counter_json
.writeStream.outputMode("update")
.format("console")
.start() // kicks of lazy evaluation
.awaitTermination()
在受控设置中,其中:
实际发布记录:1500
实际接收微批次:3
实际接收记录:1500
每个微批次的计数应该是500,所以我希望(希望)查询打印到控制台:
主题:测试
计数:500
主题:测试
计数:500
主题:测试
计数:500
但事实并非如此。它实际上打印:
主题:测试
计数:500
主题:测试
计数:1000
主题:测试
计数:1500
我理解这是因为'outputMode'完成/更新(累积)
我的问题:是否有可能准确地得到每个微批次的计数是 Spark-Kafka 结构化流式传输?
从文档中,我发现了水印方法(支持追加):
val windowedCounts = kafka_df
.withWatermark("timestamp", "10 seconds")
.groupBy(window($"timestamp", "10 seconds", "10 seconds"), $"topic")
.count()
val console_query = windowedCounts
.writeStream
.outputMode("append")
.format("console")
.start()
.awaitTermination()
但是这样的结果console_query
是不准确的,而且看起来很离谱。
TL;DR - 任何关于准确计算 Spark-Kafka 微批处理中记录的想法都将不胜感激。
解决方案
“TL;DR - 任何关于准确计算 Spark-Kafka 微批处理中记录的想法都将不胜感激。”
您可以使用StreamingQueryListener
( ScalaDocs ) 计算从 Kafka 获取的记录。
这允许您打印出从订阅的 Kafka 主题接收到的确切行数。该onQueryProgress
API 在每个微批处理期间都会被调用,并包含有关您的查询的大量有用元信息。如果没有数据流入查询,则每 10 秒调用一次 onQueryProgress。下面是一个打印输入消息数量的简单示例。
spark.streams.addListener(new StreamingQueryListener() {
override def onQueryStarted(queryStarted: QueryStartedEvent): Unit = {}
override def onQueryTerminated(queryTerminated: QueryTerminatedEvent): Unit = {}
override def onQueryProgress(queryProgress: QueryProgressEvent): Unit = {
println("NumInputRows: " + queryProgress.progress.numInputRows)
}
})
如果您正在验证结构化流查询的性能,通常最好关注以下两个指标:
queryProgress.progress.inputRowsPerSecond
queryProgress.progress.processedRowsPerSecond
如果输入高于处理,您可能会增加作业资源或减少最大限制(通过减少 readStream 选项 maxOffsetsPerTrigger)。如果处理的更高,您可能需要增加此限制。
推荐阅读
- python - 如何在 Python 中裁剪 SVG 图像?
- jquery - JQuery:使用通配符查找数据属性
- reactjs - 在承诺中反应 useState setter 不更新
- angular - resolveJsonModule 不能与 Angular 10 一起使用?
- javascript - 使用没有页眉和页脚的 JS 将 Html 转换为 MS Word
- firebase-realtime-database - 使用 SwiftUI Xcode 11 从 firebase 实时数据库中检索数据
- django - 方法在移动到下一行之前等待 asyncio 完成
- python-3.x - 并行运行测试时仅运行一次 pytest 夹具
- resharper - Resharper Go To Decompiled sources 仅显示元数据
- php - Laravel 模型 whereOrCreate