首页 > 解决方案 > 如何使用 Cloud Composer/Apache Airflow 运行带有设置文件的 Dataflow 管道?

问题描述

我有一个工作的 Dataflow 管道,第一次运行setup.py以安装一些本地帮助模块。我现在想使用 Cloud Composer/Apache Airflow 来安排管道。我已经创建了我的 DAG 文件,并将它与我的管道项目一起放置在指定的 Google Storage DAG 文件夹中。文件夹结构如下所示:

{Composer-Bucket}/
    dags/
       --DAG.py
       Pipeline-Project/
           --Pipeline.py
           --setup.py
           Module1/
              --__init__.py
           Module2/
              --__init__.py
           Module3/
              --__init__.py

我的 DAG 中指定 setup.py 文件的部分如下所示:

resumeparserop = dataflow_operator.DataFlowPythonOperator(
    task_id="resumeparsertask",
    py_file="gs://{COMPOSER-BUCKET}/dags/Pipeline-Project/Pipeline.py",
    dataflow_default_options={
        "project": {PROJECT-NAME},    
        "setup_file": "gs://{COMPOSER-BUCKET}/dags/Pipeline-Project/setup.py"})

但是,当我查看 Airflow Web UI 中的日志时,我收到错误消息:

RuntimeError: The file gs://{COMPOSER-BUCKET}/dags/Pipeline-Project/setup.py cannot be found. It was specified in the --setup_file command line option.

我不确定为什么它无法找到安装文件。如何使用设置文件/模块运行我的 Dataflow 管道?

标签: python-2.7google-cloud-dataflowairflowgoogle-cloud-composer

解决方案


如果您查看DataflowPythonOperator的代码,它看起来主 py_file 可以是 GCS 存储桶内的文件,并且在执行管道之前由操作员本地化。但是,对于 dataflow_default_options,我没有看到类似的东西。似乎这些选项只是简单地复制和格式化。

由于 GCS dag 文件夹使用Cloud Storage Fuse安装在 Airflow 实例上,因此您应该能够使用“dags_folder”环境变量在本地访问该文件。即你可以做这样的事情:

from airflow import configuration
....
LOCAL_SETUP_FILE = os.path.join(
configuration.get('core', 'dags_folder'), 'Pipeline-Project', 'setup.py')

然后,您可以将 LOCAL_SETUP_FILE 变量用于 dataflow_default_options 中的 setup_file 属性。


推荐阅读