首页 > 解决方案 > 读取数据流模板中的值提供程序参数

问题描述

我有云功能,可以在加载新文件时启动数据流模板并将 GS 上的路由传递给该文件。我必须将其作为价值提供者阅读并像这样在管道中传递它。

 class DataFlowOptions(PipelineOptions):
    @classmethod
    def _add_argparse_args(cls, parser):
        parser.add_value_provider_argument('--input')

dataflow_options = PipelineOptions().view_as(DataFlowOptions)

pipeline = apache_beam.Pipeline(options = dataflow_options)

(pipeline
    | fileio.MatchFiles(dataflow_options.input)
    | fileio.ReadMatches()
    | apache_beam.FlatMap(lambda f: csv.DictReader(io.TextIOWrapper(f.open())))
    # ...
)

但是当模板启动时出现此错误: AttributeError("'RuntimeValueProvider' object has no attribute 'strip'")

我该如何解决这个问题?

标签: google-cloud-platformgoogle-cloud-dataflowapache-beam

解决方案


不幸的是,根据文档, MatchFiles 方法似乎不接受 ValueProvider 参数,只接受字符串。一种解决方法是改用MatchAll方法,因为它使用 PCollection 作为输入。通过创建输出到使用 ValueProvider 的自定义 DoFn的Impulse 转换,您可以输出包含 ValueProvider 值作为单个元素的 PCollection,MatchAll 将读取该值。


推荐阅读