首页 > 解决方案 > 为什么在流源中没有获得任何新偏移量的情况下触发了新批次?

问题描述

我有多个 spark 结构化流式作业,我看到的通常行为是,只有当 Kafka 中有任何新的偏移量用作创建流式查询的源时,才会触发一个新的批处理。

但是,当我运行这个使用 演示任意有状态操作的示例时mapGroupsWithState,我看到即使 Streaming 源中没有新数据也会触发一个新批处理。为什么会这样,可以避免吗?

Update-1 我修改了上面的示例代码并删除了与状态相关的操作,例如更新/删除它。函数简单地输出零。但是仍然每 10 秒触发一次批处理,netcat 服务器上没有任何新数据。

import java.sql.Timestamp

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.streaming._

object Stateful {

  def main(args: Array[String]): Unit = {

    val host = "localhost"
    val port = "9999"

    val spark = SparkSession
      .builder
      .appName("StructuredSessionization")
      .master("local[2]")
      .getOrCreate()

    import spark.implicits._

    // Create DataFrame representing the stream of input lines from connection to host:port
    val lines = spark.readStream
      .format("socket")
      .option("host", host)
      .option("port", port)
      .option("includeTimestamp", true)
      .load()

    // Split the lines into words, treat words as sessionId of events
    val events = lines
      .as[(String, Timestamp)]
      .flatMap { case (line, timestamp) =>
        line.split(" ").map(word => Event(sessionId = word, timestamp))
      }

    val sessionUpdates = events
      .groupByKey(event => event.sessionId)
      .mapGroupsWithState[SessionInfo, Int](GroupStateTimeout.ProcessingTimeTimeout) {

        case (sessionId: String, events: Iterator[Event], state: GroupState[SessionInfo]) =>
          0
      }

    val query = sessionUpdates
      .writeStream
      .outputMode("update")
      .trigger(Trigger.ProcessingTime("10 seconds"))
      .format("console")
      .start()

    query.awaitTermination()
  }
}

case class Event(sessionId: String, timestamp: Timestamp)

case class SessionInfo(
                        numEvents: Int,
                        startTimestampMs: Long,
                        endTimestampMs: Long)

标签: apache-sparkspark-streamingspark-structured-streamingspark-streaming-kafka

解决方案


出现空批次的原因是在 mapGroupsWithState 调用中使用了超时。

根据“Learning Spark 2.0”一书,它说:

“即使该 micro.batch 中没有该键的数据,下一个微批处理也会调用此超时键上的函数。[...] 由于超时是在微批处理期间处理的,因此时间它们的执行不精确,很大程度上取决于触发间隔 [...]。”

由于您已将超时设置为GroupStateTimeout.ProcessingTimeTimeout与查询的触发时间一致,即 10 秒。另一种方法是根据事件时间(即GroupStateTimeout.EventTimeTimeout)设置超时。

GroupState 上的ScalaDocs提供了更多详细信息:

当某个组发生超时时,将对该组调用没有值的函数,并将 GroupState.hasTimedOut() 设置为 true。


推荐阅读