首页 > 解决方案 > 我们可以使用单个 Google Cloud Dataflow 将来自多个 Pubsub(源)的数据写入多个 GCS(接收器)吗?

问题描述

我有 3 个不同的 Pubsub(源)和 3 个相应的 GCS 存储桶(接收器)供他们处理类似的数据。目前,我的 Java 应用程序提供了三个 Cloud Dataflow 资产,它们使用窗口写入将数据从 Pubsubs 写入 GCS 存储桶。

Current pipelines:
pubsub_topic_abc ---> dataflow_abc ---> gcs_bucket_abc

pubsub_topic_def ---> dataflow_def ---> gcs_bucket_def

pubsub_topic_ghi ---> dataflow_ghi ---> gcs_bucket_ghi

有没有一种方法可以使管道使用单个数据流,该数据流可以从多个源读取数据并将它们写入多个相应的接收器?基本上,来自的数据pubsub_topic_abc应该去gcs_bucket_abc等。

Desired pipeline:
pubsub_topic_abc ----                  ---> gcs_bucket_abc
                    |                 |
pubsub_topic_def -------> dataflow -------> gcs_bucket_def
                    |                 |
pubsub_topic_ghi ----                  ---> gcs_bucket_ghi

我找到了这个链接,它解释了数据流如何从多个 Pubsub 中读取,但我不确定如何实现多个接收器写入功能(动态输出路径?)。可能吗?

标签: google-cloud-storagegoogle-cloud-dataflowpipelinegoogle-cloud-pubsubapache-beam

解决方案


是的,这是可能的。在您的代码中,执行这样的循环

  • 对于所有源(例如源名称数组)
    • 在这个源上创建 Pubsub 阅读器(你会得到一个 PCollection)
    • 在 PCollection 上应用转换
    • 为转换后的 PCollection 创建专用于 Source 的接收器

您重用转换,但源和接收器是特定的。您的数据流图将向您展示这一点

pubsub_topic_abc ---> transformation ---> gcs_bucket_abc

pubsub_topic_def ---> transformation ---> gcs_bucket_def

pubsub_topic_ghi ---> transformation ---> gcs_bucket_ghi

但所有这些都将在同一个数据流作业中运行。


推荐阅读