python - 使用数据流模板读取大查询表
问题描述
我想使用 Python 和 Dataflow 读取 BigQuery 中的表。我事先不知道表的名称。我正在使用模板来传递表名,如下所示:
.
.
.
from apache_beam.options.pipeline_options import PipelineOptions
class DataflowOptions(PipelineOptions):
@classmethod
def _add_argparse_args(cls, parser):
parser.add_value_provider_argument(
'--table_name',
help='Name of table on BigQuery')
def run(argv=None):
pipeline_options = PipelineOptions()
dataflow_options = pipeline_options.view_as(DataflowOptions)
with beam.Pipeline(options=pipeline_options) as pipeline:
table_spec = bigquery.TableReference(
projectId='MyProyectId',
datasetId='MyDataset',
tableId=str(dataflow_options.table_name))
p = (pipeline | 'Read Table' >> beam.io.Read(beam.io.BigQuerySource(table_spec)))
if __name__ == '__main__':
run()
但是当我启动作业时,我收到以下错误:
Workflow failed. Causes: S01:Read Table+Batch Users/ParDo(_GlobalWindowsBatchingDoFn)+Hash Users+Upload to Ads failed., BigQuery getting table "RuntimeValueProvider(option: table_name, type: str, default_value: None)" from dataset "MyDataset" in project "MyProject" failed., BigQuery execution failed., Error:
Message: Invalid table ID "RuntimeValueProvider(option: table_name, type: str, default_value: None)".
HTTP Code: 400
我读了这个答案,但到目前为止还没有 2017 年的东西吗?
解决方案
从这里提到的文档中,TableReference
接受以下参数(dataset_ref, table_id)
。从您的代码片段来看,大括号的位置似乎不正确。
with beam.Pipeline(options=pipeline_options) as pipeline:
dataset_ref = bigquery.DatasetReference('my-project-id', 'some_dataset')
table_spec = bigquery.TableReference(dataset_ref,
tableId=str(dataflow_options.table_name)
推荐阅读
- c++ - 如何为 android ndk 构建 rsocket-cpp
- python - Python合并排序重复数字
- sql - 如何使用 PostgreSQL 中另一个表中新插入的值?
- winapi - 支持 ARM 上的 Windows 10 桌面应用程序 - MFC 和 COM 和 OPOS 是否有效?
- flutter - 如何在 Dart 中生成类图?
- firebase - 由于权限问题,无法访问 Firebase 中的 Cloud Firestore
- bash - 为从标准输出生成的 zip 存档中的文件命名
- r - 如何返回最小目标函数对应的列?
- angular - 在哪里存储选定的列表项?
- python - 应用 groupby 后访问 DataFrame 的元素