google-cloud-dataflow - 在 Dataflow SQL 中解析属性
问题描述
给定一个 Pub/Sub 主题,BigQuery 允许使用Dataflow SQL语法将数据流式传输到表中。
假设您将此消息发布{"a": 1, "b": 2, "c": 3}
到某个主题。在 BigQuery 中,使用 Dataflow 引擎,您需要将my_topic
架构定义为
第1步
event_timestamp: TIMESTAMP
a: INT64
b: INT64
c: INT64
然后使用该命令创建一个 Dataflow 流式传输作业,以便将每条消息流式传输到目标 BigQuery 表。
第2步
gcloud dataflow sql query 'SELECT * FROM pubsub.topic.my_project.my_topic' \
--job-name my_job --region europe-west1 --bigquery-write-disposition write-append \
--bigquery-project my_project --bigquery-dataset staging --bigquery-table my_topic
gcloud pubsub topics publish my_topic --message='{"a": 1, "b": 2, "c": 3}'
bq query --nouse_legacy_sql \
'SELECT * FROM my_project.staging.my_topic ORDER BY event_timestamp DESC LIMIT 10'
+---------------------+-----+-----+-----+
| event_timestamp | a | b | c |
+---------------------+-----+-----+-----+
| 2020-10-28 14:21:40 | 1 | 2 | 3 |
在第 2 步,我还想发送--attribute="origin=gcloud,username=gcp"
到 Pub/Sub 主题。是否可以在步骤 1中定义模式以便自动写入表?
我一直在尝试不同的事情:
attributes: STRUCT
在架构中,遵循此 Beam 扩展文档,但我得到的只是 Dataflow 中的 JSON 解析错误gcloud pubsub topics publish my_topic --message='{"a": 1, "b": 2}' --attribute='c=3'
期望消息像这段代码一样被展平,但我在结果表中得到了一个NULL
值。c
谢谢你。
解决方案
Pub/Sub 属性属于MAP
类型,但这不是 Dataflow SQL支持的类型之一。有关于增加支持的讨论,但我不知道那是什么状态。
如果属性很重要,我建议使用ReadFromPubSub创建自定义管道
推荐阅读
- sql-server - 用于更新多个表的存储过程
- android - 如何手动检索控制台中上传的文件
- angular - 如何根据 API 响应选中和取消选中 mat-checkbox:Angular 6
- xcode - 致命错误:找不到“矢量”文件,使用 Xcode 10
- jquery - Bootstrap 面板通过滚动将面板主体尺寸增加到固定长度
- vb.net - HtmlAgilityPack 行的 Xpath 语法
- jvm - win10如何通过netbeans8.2调试openjdk9?
- linux - 树莓派 Autostart.desktop 终端
- symfony - DQL 查询多对多 IN
- security - 如何限制项目管理员仅创建新存储库,azure devops 中的主分支