首页 > 解决方案 > 如果我导入 google.cloud.storage,则无法在 Google Composer 上部署 DAG

问题描述

出于某种原因,如果我在 DAG 中导入 google.cloud.storage,我无法在 Google Composer 上部署 DAG 文件。如果我尝试部署这样的 DAG 文件,那么它不会被添加到 DagBag 中,因此最终会在 Airflow 网站中出现一个非链接条目并且不可用。此时,通常的信息图标显示:此 DAG 在 Web 服务器的 DagBag 对象中不可用。它显示在此列表中是因为调度程序在元数据数据库中将其标记为活动。与实际的语法错误不同,页面顶部没有错误消息。

关于我是否导入 google.cloud.storage,我已经将其准确归结为。甚至我是否真的使用它。例如,如果我注释掉存储导入行,这个 dag 可以正常工作,如果我替换它,它不会安装在 Composer 中。有人会知道为什么吗?

import datetime
from airflow import DAG
from google.cloud import storage
from airflow.operators.python_operator import PythonOperator

default_args = {
    'owner': 'Airflow',
    'depends_on_past': False,
    'email': ['kevin@here.com'],
    'email_on_failure': True,
    'email_on_retry': True,
    'retries': 1,
    'retry_delay': datetime.timedelta(minutes=5),
    'start_date': datetime.datetime(2017,1,1),
}

def ingest_file(**kwargs):
    status = 'OK'
    return status

# Not scheduled, trigger only
dag = DAG('ingest_test', default_args=default_args, schedule_interval=None)

ingest = PythonOperator(task_id = 'ingest', provide_context = True, 
  python_callable = ingest_file, dag = dag)

标签: google-cloud-platformgoogle-cloud-storage

解决方案


如果您在 DAG 或自定义 Operator 中需要 PyPi 包,那么您不会收到错误消息,DAG 只是没有部署。如果你得到这个,那么确保你需要的所有包都安装在 Composer 环境中。

请注意,存在然后不存在的行为仍然存在,但实际上会在片刻后安定下来


推荐阅读