首页 > 解决方案 > apache_beam.io.ReadFromBigQuery 在 Dataflow 管道模板中动态传递参数

问题描述

我通过将 GCP Dataflow 管道部署为 GCP 上的模板来运行它。我需要在管道中运行 BigQuery 读取语句。其中条件参数需要动态传递。我该怎么做?

我要运行的查询类似于

select * from tabel_name where field1=[dynamic_value]

这是运行查询的示例代码

    import apache_beam as beam
    query_string = "select * from tabel_name where field1=[dynamic_value]"

    options = {'project': PROJECT_ID, 'runner': 'Direct', "temp_location": "gs://my_bucket/temp",'region': "australia-southeast1", }
    pipeline_options = beam.pipeline.PipelineOptions(sys.argv,**options)
    custom_options = pipeline_options.view_as(MyOptions)

    with beam.Pipeline(options=pipeline_options) as pipeline:
        sample = (
                pipeline
                | 'QueryTable' >> beam.io.ReadFromBigQuery(query=query_string, use_standard_sql=False)
                | "Processing" >> beam.ParDo(MyPreprocessor())
                | beam.Map(print))

我需要dynamic_value从命令行选项传递--dynamic_valuesys.argv如果它不是模板,我可以将它作为参数传递,但是如果我部署模板,它需要dynamic_value作为 PipelineOption。如何动态创建查询?令人惊讶的是,该方法beam.io.ReadFromBigQuery具有将数据集、project_id 和 table 作为参数的机制,但没有任何可以为数据指定过滤器的参数。查询整个表是不必要且昂贵的。有人可以提供相同的解决方案。

标签: google-cloud-platformgoogle-bigquerygoogle-cloud-dataflowapache-beam

解决方案


您应该考虑使用Flex 模板来支持将查询作为模板的完全动态配置。


推荐阅读