首页 > 解决方案 > 使用自定义 ptransform 时使用数据流运行器构建光束管道时出现递归错误

问题描述

我使用 beam 在本地构建并设法运行了一个令人满意的管道,我准备将作业发送到 DataFlow。

我打算只用save_main_session管道选项腌制我的会话,但是在尝试这样做时遇到了递归错误。经过几次试验和错误后,我设法将其缩小到我定义 my 的方式ptransform_fn,使用装饰器。

请在下面找到一个最小的可重现示例

# my_script.py

from typing import Set
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions, SetupOptions
from apache_beam.transforms.ptransform import ptransform_fn


@ptransform_fn
def my_function(pcoll):
    return pcoll | beam.Create([1])


if __name__ == "__main__":
    options = PipelineOptions()
    options.view_as(SetupOptions).save_main_session = True
    with beam.Pipeline(options=options) as p:
        p | my_function()

完整的回溯很长,但以RecursionError: maximum recursion depth exceeded while calling a Python object

(请注意,这是save_main_session=True启用此错误的 ) 选项,因此我可以python -m my_script使用本地运行器运行它并运行到RecursionError)

由于ptransform_fn实际上my_function是以“非pythonic”的方式进行行为(在没有定义它的参数的情况下调用),所以pickler库似乎有这个问题。

所以我最后的问题是:

标签: pythonpython-3.xapache-beam

解决方案


save_main_session天生就有点脆弱;对于任何重要的事情,我建议将逻辑放在一个命名模块中,该模块可以导入您的主脚本(以及您的工作人员)中。


推荐阅读