google-cloud-storage - 我们可以使用单个 Google Cloud Dataflow 将来自多个 Pubsub(源)的数据写入多个 GCS(接收器)吗?
问题描述
我有 3 个不同的 Pubsub(源)和 3 个相应的 GCS 存储桶(接收器)供他们处理类似的数据。目前,我的 Java 应用程序提供了三个 Cloud Dataflow 资产,它们使用窗口写入将数据从 Pubsubs 写入 GCS 存储桶。
Current pipelines:
pubsub_topic_abc ---> dataflow_abc ---> gcs_bucket_abc
pubsub_topic_def ---> dataflow_def ---> gcs_bucket_def
pubsub_topic_ghi ---> dataflow_ghi ---> gcs_bucket_ghi
有没有一种方法可以使管道使用单个数据流,该数据流可以从多个源读取数据并将它们写入多个相应的接收器?基本上,来自的数据pubsub_topic_abc
应该去gcs_bucket_abc
等。
Desired pipeline:
pubsub_topic_abc ---- ---> gcs_bucket_abc
| |
pubsub_topic_def -------> dataflow -------> gcs_bucket_def
| |
pubsub_topic_ghi ---- ---> gcs_bucket_ghi
我找到了这个链接,它解释了数据流如何从多个 Pubsub 中读取,但我不确定如何实现多个接收器写入功能(动态输出路径?)。可能吗?
解决方案
是的,这是可能的。在您的代码中,执行这样的循环
- 对于所有源(例如源名称数组)
- 在这个源上创建 Pubsub 阅读器(你会得到一个 PCollection)
- 在 PCollection 上应用转换
- 为转换后的 PCollection 创建专用于 Source 的接收器
您重用转换,但源和接收器是特定的。您的数据流图将向您展示这一点
pubsub_topic_abc ---> transformation ---> gcs_bucket_abc
pubsub_topic_def ---> transformation ---> gcs_bucket_def
pubsub_topic_ghi ---> transformation ---> gcs_bucket_ghi
但所有这些都将在同一个数据流作业中运行。
推荐阅读
- python - QhullError:QH6154 Qhull 精度错误:初始单纯形是平坦的(面 1 与内点共面)
- neo4j - Neo4j:标签传播算法(LPA)结果中迭代的含义
- amazon-web-services - AppSync GraphQL 将 1-1 映射到现有的 GraphQL API
- java - 我们如何以编程方式隐藏android中的应用程序?
- mysql - 从 Ubuntu 18.04 升级到 20.04 后启动 MySQL 服务器的问题
- java - 我如何证明该算法给出了在办公桌抽屉中找到欧元的正确概率?
- ruby-on-rails - 如何在 RSpec 测试中在 RSwag 模式之外发送参数?
- r - 从 expss R 包中添加行
- c# - 如何观察添加子游戏对象并使用 UniRx 获取它?
- bash - (Mac) 登录时自动运行脚本