python - 没有用户状态上下文可用谷歌云数据流?
问题描述
我正在尝试在谷歌云数据流上运行有状态聚合 DoFn,它在其能力矩阵中引用有状态 DoFns,但是我收到以下错误:
例外:请求执行有状态 DoFn,但没有可用的用户状态上下文。这可能意味着当前运行器不支持有状态 DoFns 的执行。
前面的错误发生在这里:
@with_input_types(Dict[K, V])
@with_output_types(Dict[K, V])
class StatefulCombineDoFn(beam.DoFn):
BUFFER = BagStateSpec(
'buffer',
PickleCoder()
)
STATE = CombiningValueStateSpec(
'state',
PickleCoder(),
CombineFn()
)
EXPIRY_TIMER = TimerSpec(
'expiry',
TimeDomain.WATERMARK
)
def process(
self,
element,
w=beam.DoFn.WindowParam,
buffer=beam.DoFn.StateParam(BUFFER),
state=beam.DoFn.StateParam(STATE),
expiry_timer=beam.DoFn.TimerParam(EXPIRY_TIMER)
):
expiry_timer.set(w.end+self.allowed_lateness)
buffer.add(event)
state.add(event)
@on_timer(EXPIRY_TIMER)
def expiry(
self,
state=beam.DoFn.StateParam(STATE),
buffer=beam.DoFn.StateParam(BUFFER)
):
events = buffer.read()
info = state.read()
yield [(info, events)]
如何绕过这个?
解决方案
Dataflow 现在支持 Python 中流式管道的用户状态。这是通过您可以使用激活的新功能实现的,该功能--experiments use_runner_v2
为部分 Dataflow 工作器使用了新架构。
Runner V2 功能很快就会成为默认功能,所有流水线都将默认支持用户状态。
注意:要允许批处理管道中的用户状态,请联系 Dataflow 支持以让您的项目包含该功能。