google-cloud-platform - 读取数据流模板中的值提供程序参数
问题描述
我有云功能,可以在加载新文件时启动数据流模板并将 GS 上的路由传递给该文件。我必须将其作为价值提供者阅读并像这样在管道中传递它。
class DataFlowOptions(PipelineOptions):
@classmethod
def _add_argparse_args(cls, parser):
parser.add_value_provider_argument('--input')
dataflow_options = PipelineOptions().view_as(DataFlowOptions)
pipeline = apache_beam.Pipeline(options = dataflow_options)
(pipeline
| fileio.MatchFiles(dataflow_options.input)
| fileio.ReadMatches()
| apache_beam.FlatMap(lambda f: csv.DictReader(io.TextIOWrapper(f.open())))
# ...
)
但是当模板启动时出现此错误: AttributeError("'RuntimeValueProvider' object has no attribute 'strip'")
我该如何解决这个问题?
解决方案
不幸的是,根据文档, MatchFiles 方法似乎不接受 ValueProvider 参数,只接受字符串。一种解决方法是改用MatchAll方法,因为它使用 PCollection 作为输入。通过创建输出到使用 ValueProvider 的自定义 DoFn的Impulse 转换,您可以输出包含 ValueProvider 值作为单个元素的 PCollection,MatchAll 将读取该值。