首页 > 解决方案 > Google Cloud Composer 中的 Airflow DAG“似乎丢失”似乎是因为调用了 Google Cloud Storage

问题描述

以前的问题

此问题已在此处此处此处报告过,但是,我怀疑这可能是因为调用了谷歌云存储。

前提/问题

以下代码位于 Google Cloud Composer 实例的 DAG 文件夹中。

以下代码块“动态”基于字符串列表生成 DAG,并将成功运行,生成名称为“ new_dummy_ohhel”和“ new_dummy_hello”的 2 个 DAG。这些 DAG 可以访问并且可以正常工作。

import datetime as dt

from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator

from google.cloud.storage import Client as Client_Storage


list_strings = ["ohhello", "hellothere"]
# ^ Variable of importance

for string_val in list_strings:

    name_dag = f"new_dummy_{string_val[:5]}"
    # Just get the first 5 characters (this is important later)

    dag = DAG(
        name_dag,
        default_args={
            "owner": "airflow",
            "start_date": dt.datetime.now() - dt.timedelta(days=1),
            # ^ yesterday
            "depends_on_past": False,
            "email": [""],
            "email_on_failure": False,
            "email_on_retry": False,
            "retries": 1,
            "retry_delay": dt.timedelta(minutes=5),
        },
        schedule_interval=None,
    )

    with dag:
        op_dummy_start = DummyOperator(task_id="new-dummy-task-start")
        op_dummy_end = DummyOperator(task_id="new-dummy-task-end")

        op_dummy_start >> op_dummy_end

    globals()[name_dag] = dag

现在,问题和奇怪的是,当对 Google Cloud Storage 的简单调用替换字符串列表时,仍然会创建 DAG,但在尝试访问它们时会产生错误。如果将变量list_strings替换为以下内容,则会发生此错误。

list_strings = [
    x.name
    for x in Client_Storage().bucket("some_bucket_name").list_blobs()
]

假设我在存储桶“some_bucket_name”中有文件“something.json”和“omgwhy.json”,那么将创建两个不可访问且不可执行的 DAG:“ new_dummy_somet”和“ new_dummy_omgwh”。(仅获得前五 (5) 个字母,因此.不包括 a。)这表明对存储的调用成功,但 DAG 仍然“似乎丢失”。

即使代码立即像下面这样覆盖该列表,DAG“似乎丢失”的错误仍然会出现(注意 DAG 将是“ new_dummy_ohhel”和“ new_dummy_hello”):

list_strings = [
    x.name
    for x in Client_Storage().bucket("some_bucket_name").list_blobs()
]
list_strings = ["ohhello", "hellothere"]

tl;博士与假设

当对 Google Cloud Storage 进行任何调用(包括成功和未使用的调用)时,文件中创建的所有 DAG 都会成功显示,但会表明DAG "<whatever dag name here>" seems to be missing.我对成功调用为什么会导致此问题感到目瞪口呆。

尝试过的解决方案

标签: python-3.xgoogle-cloud-storageairflowgoogle-cloud-composer

解决方案


Cloud Composer 中的托管 Web 服务器在与您环境的主要工作机器(使用在环境创建期间指定的服务帐户)不同的服务帐户下运行。这意味着如果您打算从环境的主存储桶以外的存储桶中读取数据,则需要使用类似的 ACL,否则 Web 服务器将无法从存储桶中读取。

在您的情况下,Airflow 调度程序可能可以读取存储桶(使用环境的服务帐户),但 Web 服务器不能。调度程序将为 DAG 创建一个条目,但如果 Web 服务器无法在不遇到异常的情况下解析定义文件,您将收到“DAG x is missing”。

如果您觉得以上内容正确,您可以通过调整 Cloud Storage 存储桶 ACL 或启用DAG 序列化来解决此问题。序列化消除了 Web 服务器解析/执行定义文件的需要,并将其全部留给调度程序,因此它也可以解决您的问题。


推荐阅读