首页 > 解决方案 > 用于处理多个水印的 Spark 策略

问题描述

我正在阅读结构化流媒体文档

一方面,如果我做对了,在处理多个水印的政策下,他们说如果你在两个流上有不同的水印,那么 Spark 将使用最小值(默认情况下)或最大值(如果您明确指定它)作为全局水印(因此 Spark 将忽略另一个)。

另一方面,在Inner Joins with optional Watermarking下,他们有两个具有不同水印的流的示例,并且他们说对于每个流,将使用指定的水印(而不仅仅是最小的一个或最大的一个作为两者的全局水印)。

也许我不明白他们在Policy for handling multiple watermarks下真正试图解释什么,因为他们说如果您multipleWatermarkPolicymax较大的水印意味着流较慢。

标签: apache-sparkjoinbigdataspark-structured-streaming

解决方案


如果据我了解,您想知道多个水印在连接操作中的表现,对吧?我是这样的,我对实现进行了一些研究以找到答案。

全局使用的 multipleWatermarkPolicy 配置

spark.sql.streaming.multipleWatermarkPolicy属性全局用于涉及多个水印的所有操作,其默认值为min。您可以通过查看WatermarkTracker#updateWatermark(executedPlan: SparkPlan)MicroBatchExecution#runBatch. 并且 runBatch 被调用,org.apache.spark.sql.execution.streaming.StreamExecution#runStream它是一个负责...流执行的类;)

updateWatermark执行

updateWatermark首先从物理计划中收集所有事件时间水印节点:

    val watermarkOperators = executedPlan.collect {
      case e: EventTimeWatermarkExec => e
    }
    if (watermarkOperators.isEmpty) return

    watermarkOperators.zipWithIndex.foreach {
      case (e, index) if e.eventTimeStats.value.count > 0 =>
        logDebug(s"Observed event time stats $index: ${e.eventTimeStats.value}")
        val newWatermarkMs = e.eventTimeStats.value.max - e.delayMs
        val prevWatermarkMs = operatorToWatermarkMap.get(index)
        if (prevWatermarkMs.isEmpty || newWatermarkMs > prevWatermarkMs.get) {
          operatorToWatermarkMap.put(index, newWatermarkMs)
        }

      // Populate 0 if we haven't seen any data yet for this watermark node.
      case (_, index) =>
        if (!operatorToWatermarkMap.isDefinedAt(index)) {
          operatorToWatermarkMap.put(index, 0)
        }
    }

为了得到一个想法,流到流连接的物理计划可能如下所示:

== Physical Plan ==
WriteToDataSourceV2 org.apache.spark.sql.execution.streaming.sources.MicroBatchWriter@6a1dff1d
+- StreamingSymmetricHashJoin [mainKey#10730], [joinedKey#10733], Inner, condition = [ leftOnly = null, rightOnly = null, both = (mainEventTimeWatermark#10732-T4000ms >= joinedEventTimeWatermark#10735-T8000ms), full = (mainEventTimeWatermark#10732-T4000ms >= joinedEventTimeWatermark#10735-T8000ms) ], state info [ checkpoint = file:/tmp/temporary-3416be37-81b4-471a-b2ca-9b8f8593843a/state, runId = 17a4e028-29cb-41b0-b34b-44e20409b335, opId = 0, ver = 13, numPartitions = 200], 389000, state cleanup [ left value predicate: (mainEventTimeWatermark#10732-T4000ms <= 388999000), right = null ]
   :- Exchange hashpartitioning(mainKey#10730, 200)
   :  +- *(2) Filter isnotnull(mainEventTimeWatermark#10732-T4000ms)
   :     +- EventTimeWatermark mainEventTimeWatermark#10732: timestamp, interval 4 seconds
   :        +- *(1) Filter isnotnull(mainKey#10730)
   :           +- *(1) Project [mainKey#10730, mainEventTime#10731L, mainEventTimeWatermark#10732]
   :              +- *(1) ScanV2 MemoryStreamDataSource$[mainKey#10730, mainEventTime#10731L, mainEventTimeWatermark#10732]
   +- Exchange hashpartitioning(joinedKey#10733, 200)
      +- *(4) Filter isnotnull(joinedEventTimeWatermark#10735-T8000ms)
         +- EventTimeWatermark joinedEventTimeWatermark#10735: timestamp, interval 8 seconds
            +- *(3) Filter isnotnull(joinedKey#10733)
               +- *(3) Project [joinedKey#10733, joinedEventTime#10734L, joinedEventTimeWatermark#10735]
                  +- *(3) ScanV2 MemoryStreamDataSource$[joinedKey#10733, joinedEventTime#10734L, joinedEventTimeWatermark#10735]

稍后,updateWatermark使用 和 的可用水印策略之一MinWatermarkMaxWatermark具体取决于您在 中设置的值spark.sql.streaming.multipleWatermarkPolicy。它在MultipleWatermarkPolicy伴随对象中以这种方式解决:

  def apply(policyName: String): MultipleWatermarkPolicy = {
    policyName.toLowerCase match {
      case DEFAULT_POLICY_NAME => MinWatermark
      case "max" => MaxWatermark
      case _ =>
        throw new IllegalArgumentException(s"Could not recognize watermark policy '$policyName'")
    }
  }

updateWatermark使用已解析的策略来计算要应用于查询的水印:

    // Update the global watermark to the minimum of all watermark nodes.
    // This is the safest option, because only the global watermark is fault-tolerant. Making
    // it the minimum of all individual watermarks guarantees it will never advance past where
    // any individual watermark operator would be if it were in a plan by itself.
    val chosenGlobalWatermark = policy.chooseGlobalWatermark(operatorToWatermarkMap.values.toSeq)
    if (chosenGlobalWatermark > globalWatermarkMs) {
      logInfo(s"Updating event-time watermark from $globalWatermarkMs to $chosenGlobalWatermark ms")
      globalWatermarkMs = chosenGlobalWatermark
    } else {
      logDebug(s"Event time watermark didn't move: $chosenGlobalWatermark < $globalWatermarkMs")
    }

杂项

但是,我同意前面片段中的评论有点误导,因为它说“将全局水印更新到所有水印节点的最小值”。(https://github.com/apache/spark/blob/v2.4.3/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/WatermarkTracker.scala#L109

多个水印的行为也在EventTimeWatermarkSuite中断言。尽管它适用于 UNION,但您在前 2 部分中看到,对于所有组合操作,水印的更新方式相同。

要自行调试,请检查日志中的以下条目:

  • [2019-07-05 08:30:09,729] org.apache.spark.internal.Logging$class INFO Streaming query made progress- 返回有关每个已执行查询的所有信息。如果您使用 min 和 max multipleWatermarkPolicy 执行相同的查询,eventTime您会发现watermark应该不同的属性
  • [2019-07-05 08:30:35,685] org.apache.spark.internal.Logging$class INFO Updating event-time watermark from 0 to 6000 ms (org.apache.spark.sql.execution.streaming.WatermarkTracker:54)- 表示水印刚刚更改。如前所述,应该根据 min/max 属性而有所不同。

所以总结一下,从 2.4.0 开始,我们可以选择一个水印(最小或最大)。在 2.4.0 之前,最小水印是默认选择 ( SPARK-24730 )。所以独立于操作类型(内连接,外连接,...),因为水印解析方法对于所有查询都是相同的。


推荐阅读