首页 > 解决方案 > 没有用户状态上下文可用谷歌云数据流?

问题描述

我正在尝试在谷歌云数据流上运行有状态聚合 DoFn,它在其能力矩阵中引用有状态 DoFns,但是我收到以下错误:

例外:请求执行有状态 DoFn,但没有可用的用户状态上下文。这可能意味着当前运行器不支持有状态 DoFns 的执行。

前面的错误发生在这里:

@with_input_types(Dict[K, V])
@with_output_types(Dict[K, V])
class StatefulCombineDoFn(beam.DoFn):

    BUFFER = BagStateSpec(
        'buffer', 
        PickleCoder()
    )

    STATE = CombiningValueStateSpec(
        'state', 
        PickleCoder(), 
        CombineFn()
    )

    EXPIRY_TIMER = TimerSpec(
        'expiry', 
        TimeDomain.WATERMARK
    )

    def process(
            self, 
            element,
            w=beam.DoFn.WindowParam,
            buffer=beam.DoFn.StateParam(BUFFER),
            state=beam.DoFn.StateParam(STATE),
            expiry_timer=beam.DoFn.TimerParam(EXPIRY_TIMER)
    ):

                expiry_timer.set(w.end+self.allowed_lateness)
                buffer.add(event)
                state.add(event)

    @on_timer(EXPIRY_TIMER)
    def expiry(
        self,
        state=beam.DoFn.StateParam(STATE),
        buffer=beam.DoFn.StateParam(BUFFER)
    ):

            events = buffer.read()
            info = state.read()

            yield [(info, events)]

如何绕过这个?

标签: pythongoogle-cloud-dataflowapache-beamstateful

解决方案


Dataflow 现在支持 Python 中流式管道的用户状态。这是通过您可以使用激活的新功能实现的,该功能--experiments use_runner_v2为部分 Dataflow 工作器使用了新架构。

Runner V2 功能很快就会成为默认功能,所有流水线都将默认支持用户状态。

注意:要允许批处理管道中的用户状态,请联系 Dataflow 支持以让您的项目包含该功能。


推荐阅读