google-cloud-dataflow - 如何在 Python 中为 Google Dataflow Pipeline 设置编码器?
问题描述
我正在 Python 中创建一个自定义数据流作业,以将数据从 PubSub 摄取到 BigQuery。表有许多嵌套字段。
在这个管道中我可以在哪里设置 Coder?
avail_schema = parse_table_schema_from_json(bg_out_schema)
coder = TableRowJsonCoder(table_schema=avail_schema)
with beam.Pipeline(options=options) as p:
# Read the text from PubSub messages.
lines = (p | beam.io.ReadFromPubSub(subscription="projects/project_name/subscriptions/subscription_name")
| 'Map' >> beam.Map(coder))
# transformed = lines| 'Parse JSON to Dict' >> beam.Map(json.loads)
transformed | 'Write to BigQuery' >> beam.io.WriteToBigQuery("Project:DataSet.Table", schema=avail_schema, write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
Error: Map can be used only with callable objects. Received TableRowJsonCoder instead.
解决方案
在上面的代码中,编码器应用于从 PubSub 读取的文本消息。
WriteToBigQuery 适用于字典和 TableRow。json.load 发出 dict,因此您可以简单地使用它的输出写入 BigQuery,而无需应用任何编码器。请注意,字典中的字段必须与表模式匹配。
为避免编码器问题,我建议使用以下代码。
avail_schema = parse_table_schema_from_json(bg_out_schema)
with beam.Pipeline(options=options) as p:
# Read the text from PubSub messages.
lines = (p | beam.io.ReadFromPubSub(subscription="projects/project_name/subscriptions/subscription_name"))
transformed = lines| 'Parse JSON to Dict' >> beam.Map(json.loads)
transformed | 'Write to BigQuery' >> beam.io.WriteToBigQuery("Project:DataSet.Table", schema=avail_schema, write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
推荐阅读
- asp.net-core - AuthorizeRouteView 不适用于已注销的用户
- c# - 可空引用类型和“[CS8603] 可能的空引用返回。”
- angular - 如何在 POST 请求中使用 ngFor 显示数据
- javascript - 为什么我在 jquery ajax 帖子上不断收到 405 方法?
- android - JSON 数据只显示一次
- android - 将样式应用于警报对话框
- c# - “分层编译”如何影响 .NET Core 3.0 应用程序,我应该何时禁用它?
- arrays - 如何在 Swift 中逐字地获得相等大小的数组切片
- javascript - NODE_ENV 此时出现意外错误
- c# - 颜色不包含定义