首页 > 解决方案 > Structured Streaming 是如何为每个微批次规划流式查询的逻辑计划的?

问题描述

我在笔记本电脑上设置了一个小测试,它执行以下操作:

我创建了一个包含 1000 条消息的 Kafka 主题,其中每条消息包含几行,每行大约有 100 列。

在 List[Column] 中创建 300 个非常复杂的 Spark 列。没有聚合。

在设置来自 Kafka 的流时,我设置了 .option("maxOffsetsPerTrigger", 1) 以便在每个小批量中只处理一条消息。

然后,我将这些列应用于仅包含一条消息的小批量。

val testStream = myStream
  .select(manyTestCols :_*)
  .writeStream
  .format("console")
  .outputMode("append")
  .start()
  .awaitTermination()

Spark 处理每条消息大约需要 10 秒。

然后我将 maxOffsetsPerTrigger 更改为 .option("maxOffsetsPerTrigger", 1000) 以便在每个小批量中处理 1000 条消息。

Spark 大约需要 11 秒来处理每个 mini-batch 中的所有 1000 条消息。

因此,Spark 似乎做了某种“设置工作”,然后一旦开始处理每个小批量,就会非常快。

对于每个小批量,这个“设置工作”是否会从查询计划到物理计划?

如果是这样,Spark 对每个 mini-batch 执行此操作是否有意义?

还是完全在发生其他事情?我正在查看 Spark 源代码,但希望得到已经完成此练习的人的反馈。

发送任何见解。

标签: apache-sparkspark-structured-streaming

解决方案


对于每个小批量,这个“设置工作”是否会从查询计划到物理计划?

对于流式查询的查询计划的特定执行部分,在运行时填写如下(带有指向相应代码部分的链接),部分是的:

  1. 数据源的正确关系(例如,无数据源的LocalRelation )
  2. 事件时间水印
  3. 当前(微批次)时间

如果是这样,Spark 对每个 mini-batch 执行此操作是否有意义?

绝对地。结构化流中没有其他方法可以短路无数据源、跟踪当前时间和水印。

这也是当水印发生变化时为有状态的操作员提供额外的无数据微批处理的原因。

我正在查看 Spark 源代码,但希望得到已经完成此练习的人的反馈。

请参阅MicroBatchExecutionIncrementalExecution


推荐阅读