首页 > 解决方案 > Apache Beam 到 BigQuery

问题描述

我正在 Google Cloud Dataflow 中构建一个进程,它将在 Pub/Sub 中使用消息,并根据一个键的值将它们写入 BQ 或 GCS。我能够拆分消息,但我不确定如何将数据写入 BigQuery。我试过使用beam.io.gcp.bigquery.WriteToBigQuery,但没有运气。

我的完整代码在这里:https ://pastebin.com/4W9Vu4Km

基本上我的问题是我不知道如何在WriteBatchesToBQ(第 73 行)中指定变量element应该写入 BQ。

我也尝试过beam.io.gcp.bigquery.WriteToBigQuery直接在管道中使用(第 128 行),但后来出现错误AttributeError: 'list' object has no attribute 'items' [while running 'Write to BQ/_StreamToBigQuery/StreamInsertRows/ParDo(BigQueryWriteFn)'] 。这可能是因为我给它的不是字典,而是字典列表(我想使用 1 分钟窗口)。

请问有什么想法吗?(如果代码中有一些太愚蠢的地方,请告诉我——我只是在短时间内使用 apache Beam,我可能会忽略一些明显的问题)。

标签: pythongoogle-cloud-platformgoogle-bigquerygoogle-cloud-dataflowapache-beam

解决方案


WriteToBigQuery 示例格式如下:-

    project_id = "proj1"
    dataset_id = 'dataset1'
    table_id = 'table1'
    table_schema = ('id:STRING, reqid:STRING')

        | 'Write-CH' >> beam.io.WriteToBigQuery(
                                                    table=table_id,
                                                    dataset=dataset_id,
                                                    project=project_id,
                                                    schema=table_schema,
                                                    create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
                                                    write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND
                                                    ))

您可以参考这个案例,它将让您简要了解光束数据管道。


推荐阅读