python - Apache Beam 到 BigQuery
问题描述
我正在 Google Cloud Dataflow 中构建一个进程,它将在 Pub/Sub 中使用消息,并根据一个键的值将它们写入 BQ 或 GCS。我能够拆分消息,但我不确定如何将数据写入 BigQuery。我试过使用beam.io.gcp.bigquery.WriteToBigQuery
,但没有运气。
我的完整代码在这里:https ://pastebin.com/4W9Vu4Km
基本上我的问题是我不知道如何在WriteBatchesToBQ
(第 73 行)中指定变量element
应该写入 BQ。
我也尝试过beam.io.gcp.bigquery.WriteToBigQuery
直接在管道中使用(第 128 行),但后来出现错误AttributeError: 'list' object has no attribute 'items' [while running 'Write to BQ/_StreamToBigQuery/StreamInsertRows/ParDo(BigQueryWriteFn)']
。这可能是因为我给它的不是字典,而是字典列表(我想使用 1 分钟窗口)。
请问有什么想法吗?(如果代码中有一些太愚蠢的地方,请告诉我——我只是在短时间内使用 apache Beam,我可能会忽略一些明显的问题)。
解决方案
WriteToBigQuery 示例格式如下:-
project_id = "proj1"
dataset_id = 'dataset1'
table_id = 'table1'
table_schema = ('id:STRING, reqid:STRING')
| 'Write-CH' >> beam.io.WriteToBigQuery(
table=table_id,
dataset=dataset_id,
project=project_id,
schema=table_schema,
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND
))
您可以参考这个案例,它将让您简要了解光束数据管道。
推荐阅读
- apache-spark - 使用 X 列对数据框进行分区并写入没有 X 列的数据
- android - 将 qraphql 查询中的对象数组显示到回收器视图 android
- pine-script - 我正在尝试锻炼如何总结牛市和熊市蜡烛在给定日期的所有开盘/收盘
- linux - 如何在没有完整的 linux 源代码树的情况下交叉编译外部模块?
- python - Docker:在映像中、主机上或两者中安装 pip 包?
- c# - 404 未找到,即使图像存在
- python - Django Paginator 无法正常工作,我该如何解决
- python-3.x - 尝试修改xml时如何在Python中通过ElementTree解析xml时保留命名空间
- react-native - 在图标名称中使用下划线时出错
在本机反应 - c++ - std::function 的目标对象在哪里被禁止抛出破坏?