首页 > 解决方案 > ProcessWindowFunction 中的 Apache Flink 状态

问题描述

我试图了解可以在 ProcessWindowFunction 中使用的各种状态的差异。

首先,ProcessWindowFunction 是一个 AbstractRichFunction

abstract class ProcessWindowFunction[IN, OUT, KEY, W <: Window]
    extends AbstractRichFunction {...}

因此它可以使用该方法

public RuntimeContext getRuntimeContext() 

得到一个状态

getRuntimeContext().getState

另外,WindowProcessFunction的处理函数

def process(key: KEY, context: Context, elements: Iterable[IN], out: 
    Collector[OUT]) {}

有一个上下文,两种方法再次允许我获取状态:

/**
  * State accessor for per-key and per-window state.
  */
def windowState: KeyedStateStore

/**
  * State accessor for per-key global state.
  */
def globalState: KeyedStateStore

这是我的问题:

1) 这些与 getRuntimeContext().getState 有什么关系?

2)我经常使用自定义触发器实现和 GlobalWindow。在这种情况下,使用 getPartitionedState 检索状态。我可以在触发函数中访问在 WindowProcessFunction 中定义的窗口状态吗?如果有怎么办?

3)Trigger类中没有open方法可以覆盖,状态创建如何处理?只调用同时管理状态创建的 getPartitionedState 是否安全?

标签: apache-flink

解决方案


  1. 列表项getRuntimeContext().getState调用等效于globalStatea ProcessWindowFunction.Context。两者都是“全局”状态,与windowState. “全局”意味着状态在具有相同键的所有窗口之间共享。windowState每个窗口都是独立的,即使对于同一个键也是如此。请记住,即使是“全局”状态也不会在不同的键之间共享。
  2. 在我看来,TriggerContext#getPartitionedState()并且ProcessWindowFunction.Context#globalState()指向同一件事。
  3. 基于我发现的代码和一个示例(org.apache.flink.table.runtime.triggers.StateCleaningCountTrigger):是的,getPartitionedState()如果之前没有创建状态,应该处理它的创建。

推荐阅读