首页 > 解决方案 > Flink 中 KeyedBroadcastProcessFunction 的键控状态如何管理?

问题描述

BroadcastState用来在 Flink 中执行流式计算。我已经KeyedBroadcastProcessFunction为我的工作定义了一个扩展类。假设我有一个由 键控的流 A(user_id, location)和一个流 B,它被广播到所有执行程序以使用我定义的类处理 A 中的元素。我知道我可以在这个类中processBroadcastElementprocessElement在这个类中注册一个计时器,这样当它超时时,我可以通过调用来删除特定键组的关联状态state.clear()。我想知道之后,这个关键组是否仍然存在?

例如,在流 A 中,带有一条新消息(user_id=1, location='usa'),我们生成了这样的密钥组及其相关状态。之后,如果另一个消息(user_id=1, location='usa')来了,它将触发processElement()并发出结果。

说 24 小时后,我对这个键组不再感兴趣(user_id=1, location='usa'),我可以注册一个定时器来清除关联状态,但是我无法控制这个键组。结果,24小时后,当另一个消息来的时候(user_id=1, location='usa'),由于这个密钥组仍然存在,processElement()仍然会被调用。随着作业的运行,尽管它们的关联状态将在 24 小时后被清除,但关键组是否会累积或不应该成为内存使用的问题?

相关博客:https ://www.da-platform.com/blog/a-practical-guide-to-broadcast-state-in-apache-flink

标签: apache-flinkflink-streaming

解决方案


Flink 的键控状态被组织为分布式(或分片)键值存储,其中键可以是简单的东西,如整数和字符串,也可以是复合物,如 (user_id=1, location='usa')。键组复合键不同。键组是 Flink 1.2 中引入的运行时构造(请参阅FLINK-3755),以允许有效地重新调整键值状态。密钥组是密钥空间的子集,并作为一个独立的单元设置检查点。在运行时,同一个键组中的所有键在作业图中被分区在一起——每个子任务都有一个或多个完整键组的键值状态。这个设计文档提供更多细节。作为使用 DataStream API 的用户,密钥组是一个实现细节,而不是您直接使用的东西。

至于 a 中的定时器KeyedBroadcastProcessFunction,可以在processElementoronTimer方法中注册,但不能在processBroadcastElement方法中注册。这是因为定时器总是与一个键相关联,并且没有与广播元素相关联的键。但是,您可以processBroadcastElement通过使用对象上的applyToKeyedState方法在方法期间操作任何或所有键控状态KeyedBroadcastProcessFunction.Context。有关更多详细信息,请参阅文档

调用 state.clear() 后,该键的状态条目将被删除。当然,该键的新流事件可能会在状态清除后到达,如果您愿意,您可以再次存储该键的值状态。为了避免由于保持不再相关键的状态而导致无限的内存使用,您需要小心。您可能希望像这样的一些逻辑在每次创建后 24 小时使状态过期:

processElement:
  if state.value() is null, register timer
  state.update(...)

onTimer:
  state.clear()

或者,您可能需要更复杂的逻辑,以便在更新或访问状态时延长状态的生命周期。

另一种选择是使用状态生存时间功能。

更新:

每当您在任何 ProcessFunction 类型的processElementoronTimer方法中时,上下文中都会隐含一个特定的键,并且对键控状态(例如.update()or .clear())所做的任何事情都只会影响该键的状态。

广播状态的工作方式不同。广播状态始终为 MapState,并被复制到所有并行子任务中。广播状态是无键的——如果您在processElement方法期间读取广播状态,您将看到广播状态的相同值,而不管该调用期间上下文中的键是什么。

只有在processBroadcastElementa 的方法中,KeyedBroadcastProcessFunction您才能修改(或清除)广播状态,重要的是在所有并行实例中以相同的方式进行任何修改(或删除)。这样设计是为了保证每个并行实例在广播状态下都具有相同的内容。忽略此规则将导致状态不一致,这可能非常难以调试。有关更多信息,请参阅文档

所以是的,如果你在广播状态上调用 .clear() ,那么所有键的所有广播状态都将被删除。或者您可以从广播状态中删除特定项目(请记住,广播状态是 MapState),在这种情况下,将为所有键删除该特定项目。

在 Flink 培训站点中有几个使用广播状态的示例。看


推荐阅读