apache-spark - Spark Stateful Structured Streaming:状态在 mapGroupsWithState 中变得太大
问题描述
我正在尝试使用mapGroupsWithState方法为我的传入数据流进行有状态结构化流。但我面临的问题是我为groupByKey选择的键使我的状态太大太快。显而易见的出路是更改密钥,但我希望在更新方法中应用的业务逻辑要求密钥与我现在拥有的完全相同,或者如果可能,访问所有密钥的GroupState。
例如,我有一个来自各个组织的数据流,通常一个组织包含 userId、personId 等。请参见下面的代码:
val stream: Dataset[User] = dataFrame.as[User]
val noTimeout = GroupStateTimeout.NoTimeout
val statisticStream = stream
.groupByKey(key => key.orgId)
.mapGroupsWithState(noTimeout)(updateUserStatistic)
val df = statisticStream.toDF()
val query = df
.writeStream
.outputMode(Update())
.option("checkpointLocation", s"$checkpointLocation/$name")
.foreach(new UserCountWriter(spark.sparkContext.getConf))
.outputMode(Update())
.queryName(name)
.trigger(Trigger.ProcessingTime(Duration.apply("10 seconds")))
案例类:
case class User(
orgId: Long,
profileId: Long,
userId: Long)
case class UserStatistic(
orgId: Long,
known: Long,
uknown: Long,
userSeq: Seq[User])
更新方法:
def updateUserStatistic(
orgId: Long,
newEvents: Iterator[User],
oldState: GroupState[UserStatistic]): UserStatistic = {
var state: UserStatistic = if (oldState.exists) oldState.get else UserStatistic(orgId, 0L, 0L, Seq.empty)
for (event <- newEvents) {
//business logic like checking if userId in this organization is of certain type and then accordingly update the known or unknown attribute for that particular user.
oldState.update(state)
state
}
当我必须在 Driver-Executor 模型上执行此操作时,问题会变得更糟,因为我预计每个组织中有 1-1000 万用户,这可能意味着单个执行器上有这么多状态(如果我理解错误,请纠正我。)
可能失败的解决方案:
- 按用户 ID 键分组 - 因为那时我无法获取给定 orgId 的所有用户 ID,因为这些 GroupStates 放在聚合键、值对中,这里是用户 ID。因此,对于每个新的 UserId,都会创建一个新状态,即使它属于同一个组织。
任何帮助或建议表示赞赏。
解决方案
您的状态不断增加,因为在当前实现中,不会从 GroupState 中删除任何键/状态对。
为了准确缓解您面临的问题(无限增加状态),该mapGroupsWithState
方法允许您使用Timeout。您可以在两种超时类型之间进行选择:
- 使用
GroupStateTimeout.ProcessingTimeTimeout
withGroupState.setTimeoutDuration()
或 GroupStateTimeout.EventTimeTimeout
使用with的事件时间超时GroupState.setTimeoutTimestamp()
。
请注意,它们之间的区别是基于持续时间的超时和更灵活的基于时间的超时。
在特性的 ScalaDocs 中,GroupState
您会找到一个很好的模板,介绍如何在映射函数中使用超时:
def mappingFunction(key: String, value: Iterator[Int], state: GroupState[Int]): String = {
if (state.hasTimedOut) { // If called when timing out, remove the state
state.remove()
} else if (state.exists) { // If state exists, use it for processing
val existingState = state.get // Get the existing state
val shouldRemove = ... // Decide whether to remove the state
if (shouldRemove) {
state.remove() // Remove the state
} else {
val newState = ...
state.update(newState) // Set the new state
state.setTimeoutDuration("1 hour") // Set the timeout
}
} else {
val initialState = ...
state.update(initialState) // Set the initial state
state.setTimeoutDuration("1 hour") // Set the timeout
}
...
// return something
}
推荐阅读
- excel - 如何更改数组中的数据在工作表中的显示方式
- r - 在R中拆分数字向量
- autodesk-forge - 使用 Javascript 下载 BIM360 Docs 文件
- java - 使用 CompleteableFuture 异步执行两个方法
- c++ - 一个函数中的 C++ 快速排序,具有 2 个参数(___ _____,int 长度)
- php - 使用 Ajax 发送表单数据时 PHP 没有得到 $_FILES
- powershell - 如何删除 PowerShell 说不存在的文件?
- ios - 快速完成块不按顺序执行任务?
- elasticsearch - 如何在弹性搜索查询中搜索文档的排名?
- c# - Nullable.GetUnderlyingType(string) returns null