首页 > 解决方案 > Spark Stateful Structured Streaming:状态在 mapGroupsWithState 中变得太大

问题描述

我正在尝试使用mapGroupsWithState方法为我的传入数据流进行有状态结构化流。但我面临的问题是我为groupByKey选择的键使我的状态太大太快。显而易见的出路是更改密钥,但我希望在更新方法中应用的业务逻辑要求密钥与我现在拥有的完全相同,或者如果可能,访问所有密钥的GroupState

例如,我有一个来自各个组织的数据流,通常一个组织包含 userId、personId 等。请参见下面的代码:

val stream: Dataset[User] = dataFrame.as[User]
val noTimeout = GroupStateTimeout.NoTimeout
val statisticStream = stream
    .groupByKey(key => key.orgId)
    .mapGroupsWithState(noTimeout)(updateUserStatistic)

val df = statisticStream.toDF()

val query = df
    .writeStream
    .outputMode(Update())
    .option("checkpointLocation", s"$checkpointLocation/$name")
    .foreach(new UserCountWriter(spark.sparkContext.getConf))
    .outputMode(Update())
    .queryName(name)
    .trigger(Trigger.ProcessingTime(Duration.apply("10 seconds")))

案例类:

case class User(
  orgId: Long,
  profileId: Long,
  userId: Long)

case class UserStatistic(
  orgId: Long,
  known: Long,
  uknown: Long,
  userSeq: Seq[User])

更新方法:

def updateUserStatistic(
  orgId: Long, 
  newEvents: Iterator[User], 
  oldState: GroupState[UserStatistic]): UserStatistic = {
    var state: UserStatistic = if (oldState.exists) oldState.get else UserStatistic(orgId, 0L, 0L, Seq.empty)
    for (event <- newEvents) {
    //business logic like checking if userId in this organization is of certain type and then accordingly update the known or unknown attribute for that particular user.  
    oldState.update(state)
    state
  }

当我必须在 Driver-Executor 模型上执行此操作时,问题会变得更糟,因为我预计每个组织中有 1-1000 万用户,这可能意味着单个执行器上有这么多状态(如果我理解错误,请纠正我。)

可能失败的解决方案:

  1. 按用户 ID 键分组 - 因为那时我无法获取给定 orgId 的所有用户 ID,因为这些 GroupStates 放在聚合键、值对中,这里是用户 ID。因此,对于每个新的 UserId,都会创建一个新状态,即使它属于同一个组织。

任何帮助或建议表示赞赏。

标签: apache-sparkaggregatespark-streamingspark-structured-streaming

解决方案


您的状态不断增加,因为在当前实现中,不会从 GroupState 中删除任何键/状态对。

为了准确缓解您面临的问题(无限增加状态),该mapGroupsWithState方法允许您使用Timeout。您可以在两种超时类型之间进行选择:

  • 使用GroupStateTimeout.ProcessingTimeTimeoutwithGroupState.setTimeoutDuration()
  • GroupStateTimeout.EventTimeTimeout使用with的事件时间超时GroupState.setTimeoutTimestamp()

请注意,它们之间的区别是基于持续时间的超时和更灵活的基于时间的超时。

在特性的 ScalaDocs 中,GroupState您会找到一个很好的模板,介绍如何在映射函数中使用超时:

def mappingFunction(key: String, value: Iterator[Int], state: GroupState[Int]): String = {

  if (state.hasTimedOut) {                // If called when timing out, remove the state
    state.remove()

  } else if (state.exists) {              // If state exists, use it for processing
    val existingState = state.get         // Get the existing state
    val shouldRemove = ...                // Decide whether to remove the state
    if (shouldRemove) {
      state.remove()                      // Remove the state

    } else {
      val newState = ...
      state.update(newState)              // Set the new state
      state.setTimeoutDuration("1 hour")  // Set the timeout
    }

  } else {
    val initialState = ...
    state.update(initialState)            // Set the initial state
    state.setTimeoutDuration("1 hour")    // Set the timeout
  }
  ...
  // return something
}

推荐阅读