首页 > 解决方案 > 如何在 Python 中为 Google Dataflow Pipeline 设置编码器?

问题描述

我正在 Python 中创建一个自定义数据流作业,以将数据从 PubSub 摄取到 BigQuery。表有许多嵌套字段。

在这个管道中我可以在哪里设置 Coder?

avail_schema = parse_table_schema_from_json(bg_out_schema)
coder = TableRowJsonCoder(table_schema=avail_schema)

with beam.Pipeline(options=options) as p:
    # Read the text from PubSub messages.
    lines = (p | beam.io.ReadFromPubSub(subscription="projects/project_name/subscriptions/subscription_name")
              | 'Map' >> beam.Map(coder))
    # transformed = lines| 'Parse JSON to Dict' >> beam.Map(json.loads)
    transformed | 'Write to BigQuery' >> beam.io.WriteToBigQuery("Project:DataSet.Table", schema=avail_schema, write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)


Error: Map can be used only with callable objects. Received TableRowJsonCoder instead.

标签: google-cloud-dataflowgoogle-cloud-pubsub

解决方案


在上面的代码中,编码器应用于从 PubSub 读取的文本消息。

WriteToBigQuery 适用于字典和 TableRow。json.load 发出 dict,因此您可以简单地使用它的输出写入 BigQuery,而无需应用任何编码器。请注意,字典中的字段必须与表模式匹配。

为避免编码器问题,我建议使用以下代码。

avail_schema = parse_table_schema_from_json(bg_out_schema)

with beam.Pipeline(options=options) as p:
    # Read the text from PubSub messages.
    lines = (p | beam.io.ReadFromPubSub(subscription="projects/project_name/subscriptions/subscription_name"))
    transformed = lines| 'Parse JSON to Dict' >> beam.Map(json.loads)
    transformed | 'Write to BigQuery' >> beam.io.WriteToBigQuery("Project:DataSet.Table", schema=avail_schema, write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)

推荐阅读