首页 > 解决方案 > 使用 Cloud Composer 在文件到达 GCS 时触发 Dataflow 作业

问题描述

我是 GCP 的新手,并试图更好地了解 Google Cloud 提供的 ETL 服务。我正在尝试做一个 POC,只要文件到达 GCS 中的指定位置,数据流作业就会运行,我想利用 Cloud Composer 来做同样的事情。

如何在 DAG 中使用不同的气流运算符(一个用于监视 GCS 中的文件,另一个用于触发数据流作业)?

标签: python-3.xgoogle-cloud-platformairflowgoogle-cloud-dataflowgoogle-cloud-composer

解决方案


  1. 创建文件创建/到达您的存储桶时触发的创建云函数。您可以在以下文档中找到如何部署此类功能 https://cloud.google.com/functions/docs/calling/storage#deploy
  2. 编写函数以触发 DAG,以下文档对此进行了详细说明:https ://cloud.google.com/composer/docs/how-to/using/triggering-with-gcf
  3. 使用以下任一气流运算符将数据流作业作为 DAG 中的任务运行
  • DataflowCreateJavaJobOperator如果您使用的是 Java SDK
  • DataflowCreatePythonJobOperator如果您使用的是 Python SDK

更多详细信息:https ://airflow.apache.org/docs/apache-airflow-providers-google/2.1.0/operators/cloud/dataflow.html


这是您关于作曲家问题的答案,但您应该考虑将 Composer 替换为Cloud Workflows,而不是使用气流数据流运算符,您可以使用Cloud Workflows dataflow connector,并且您可以编写云函数来触发 Cloud Workflows 执行. 有许多不同语言的客户端库可以做到这一点。这里是选择您的图书馆的链接:https ://cloud.google.com/workflows/docs/reference/libraries

最后,与 Composer 相比,Cloud Workflows 的优点。

  • 它是无服务器的:您不必担心机器类型、网络……

  • 您使用 yaml 轻松快速地描述您的工作流程,因此您不必担心学习气流

  • 它比作曲家便宜得多

  • ...


推荐阅读