首页 > 解决方案 > 在 Dataflow SQL 中解析属性

问题描述

给定一个 Pub/Sub 主题,BigQuery 允许使用Dataflow SQL语法将数据流式传输到表中。

假设您将此消息发布{"a": 1, "b": 2, "c": 3}到某个主题。在 BigQuery 中,使用 Dataflow 引擎,您需要将my_topic架构定义为

第1步

event_timestamp: TIMESTAMP
a: INT64
b: INT64
c: INT64

然后使用该命令创建一个 Dataflow 流式传输作业,以便将每条消息流式传输到目标 BigQuery 表。

第2步

gcloud dataflow sql query 'SELECT * FROM pubsub.topic.my_project.my_topic' \
  --job-name my_job --region europe-west1 --bigquery-write-disposition write-append \
  --bigquery-project my_project --bigquery-dataset staging --bigquery-table my_topic

gcloud pubsub topics publish my_topic --message='{"a": 1, "b": 2, "c": 3}'
​
bq query --nouse_legacy_sql \
  'SELECT * FROM my_project.staging.my_topic ORDER BY event_timestamp DESC LIMIT 10'

+---------------------+-----+-----+-----+
|   event_timestamp   |  a  |  b  |  c  |
+---------------------+-----+-----+-----+
| 2020-10-28 14:21:40 |  1  |  2  |  3  |

第 2 步,我还想发送--attribute="origin=gcloud,username=gcp"到 Pub/Sub 主题。是否可以在步骤 1中定义模式以便自动写入表?

我一直在尝试不同的事情:

谢谢你。

标签: google-cloud-dataflowapache-beamgoogle-cloud-pubsub

解决方案


Pub/Sub 属性属于MAP类型,但这不是 Dataflow SQL支持的类型之一。有关于增加支持的讨论,但我不知道那是什么状态。

如果属性很重要,我建议使用ReadFromPubSub创建自定义管道


推荐阅读