google-cloud-platform - apache_beam.io.ReadFromBigQuery 在 Dataflow 管道模板中动态传递参数
问题描述
我通过将 GCP Dataflow 管道部署为 GCP 上的模板来运行它。我需要在管道中运行 BigQuery 读取语句。其中条件参数需要动态传递。我该怎么做?
我要运行的查询类似于
select * from tabel_name where field1=[dynamic_value]
这是运行查询的示例代码
import apache_beam as beam
query_string = "select * from tabel_name where field1=[dynamic_value]"
options = {'project': PROJECT_ID, 'runner': 'Direct', "temp_location": "gs://my_bucket/temp",'region': "australia-southeast1", }
pipeline_options = beam.pipeline.PipelineOptions(sys.argv,**options)
custom_options = pipeline_options.view_as(MyOptions)
with beam.Pipeline(options=pipeline_options) as pipeline:
sample = (
pipeline
| 'QueryTable' >> beam.io.ReadFromBigQuery(query=query_string, use_standard_sql=False)
| "Processing" >> beam.ParDo(MyPreprocessor())
| beam.Map(print))
我需要dynamic_value
从命令行选项传递--dynamic_value
。sys.argv
如果它不是模板,我可以将它作为参数传递,但是如果我部署模板,它需要dynamic_value
作为 PipelineOption。如何动态创建查询?令人惊讶的是,该方法beam.io.ReadFromBigQuery
具有将数据集、project_id 和 table 作为参数的机制,但没有任何可以为数据指定过滤器的参数。查询整个表是不必要且昂贵的。有人可以提供相同的解决方案。
解决方案
您应该考虑使用Flex 模板来支持将查询作为模板的完全动态配置。
推荐阅读
- flutter - Flutter:Syncfusion图表为y轴提供上限
- c++ - 我的 winreg 函数出现错误 2
- mysql - 使用 MySql 和 Emojis
- scala - 如何将 udf 应用于 Dafaframe 上的所有字符串和字符串数组
- ruby-on-rails - 如何遍历 ActiveRecord 列列表并替换文本?
- javascript - 当我遇到断点时,WebStorm 在 Vagrant 中打开文件的新只读版本
- python - $pip 命令获取 ImportError: No module named typing
- c++ - 在 C++ 中合并线程协程输出
- nagios - 使用 check_disk 从 centreon 20.4 监控主机
- onclick - 在 Python Turtle 中的事件处理程序中调用函数