python-3.x - Google Cloud Composer 中的 Airflow DAG“似乎丢失”似乎是因为调用了 Google Cloud Storage
问题描述
以前的问题
此问题已在此处、此处和此处报告过,但是,我怀疑这可能是因为调用了谷歌云存储。
前提/问题
以下代码位于 Google Cloud Composer 实例的 DAG 文件夹中。
以下代码块“动态”基于字符串列表生成 DAG,并将成功运行,生成名称为“ new_dummy_ohhel
”和“ new_dummy_hello
”的 2 个 DAG。这些 DAG 可以访问并且可以正常工作。
import datetime as dt
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from google.cloud.storage import Client as Client_Storage
list_strings = ["ohhello", "hellothere"]
# ^ Variable of importance
for string_val in list_strings:
name_dag = f"new_dummy_{string_val[:5]}"
# Just get the first 5 characters (this is important later)
dag = DAG(
name_dag,
default_args={
"owner": "airflow",
"start_date": dt.datetime.now() - dt.timedelta(days=1),
# ^ yesterday
"depends_on_past": False,
"email": [""],
"email_on_failure": False,
"email_on_retry": False,
"retries": 1,
"retry_delay": dt.timedelta(minutes=5),
},
schedule_interval=None,
)
with dag:
op_dummy_start = DummyOperator(task_id="new-dummy-task-start")
op_dummy_end = DummyOperator(task_id="new-dummy-task-end")
op_dummy_start >> op_dummy_end
globals()[name_dag] = dag
现在,问题和奇怪的是,当对 Google Cloud Storage 的简单调用替换字符串列表时,仍然会创建 DAG,但在尝试访问它们时会产生错误。如果将变量list_strings
替换为以下内容,则会发生此错误。
list_strings = [
x.name
for x in Client_Storage().bucket("some_bucket_name").list_blobs()
]
假设我在存储桶“some_bucket_name”中有文件“something.json”和“omgwhy.json”,那么将创建两个不可访问且不可执行的 DAG:“ new_dummy_somet
”和“ new_dummy_omgwh
”。(仅获得前五 (5) 个字母,因此.
不包括 a。)这表明对存储的调用成功,但 DAG 仍然“似乎丢失”。
即使代码立即像下面这样覆盖该列表,DAG“似乎丢失”的错误仍然会出现(注意 DAG 将是“ new_dummy_ohhel
”和“ new_dummy_hello
”):
list_strings = [
x.name
for x in Client_Storage().bucket("some_bucket_name").list_blobs()
]
list_strings = ["ohhello", "hellothere"]
tl;博士与假设
当对 Google Cloud Storage 进行任何调用(包括成功和未使用的调用)时,文件中创建的所有 DAG 都会成功显示,但会表明DAG "<whatever dag name here>" seems to be missing.
我对成功调用为什么会导致此问题感到目瞪口呆。
尝试过的解决方案
- 尝试重新启动整个 Google Cloud Composer 实例
- 尝试通过添加环境变量重新启动 Airflow 网络服务器
解决方案
Cloud Composer 中的托管 Web 服务器在与您环境的主要工作机器(使用在环境创建期间指定的服务帐户)不同的服务帐户下运行。这意味着如果您打算从环境的主存储桶以外的存储桶中读取数据,则需要使用类似的 ACL,否则 Web 服务器将无法从存储桶中读取。
在您的情况下,Airflow 调度程序可能可以读取存储桶(使用环境的服务帐户),但 Web 服务器不能。调度程序将为 DAG 创建一个条目,但如果 Web 服务器无法在不遇到异常的情况下解析定义文件,您将收到“DAG x is missing”。
如果您觉得以上内容正确,您可以通过调整 Cloud Storage 存储桶 ACL 或启用DAG 序列化来解决此问题。序列化消除了 Web 服务器解析/执行定义文件的需要,并将其全部留给调度程序,因此它也可以解决您的问题。
推荐阅读
- string - 如何在 Bash 脚本中连接字符串变量
- c - 我在 C 程序中对数组进行排序时出错
- jvm - 当我执行“jmap -histo:live 4984”时,类名是什么 [C [B [[C]?
- reactjs - 带有数组的 JSON 对象的 React/JSX 箭头函数。
- angular - 提供者内部的显示组件
- asp.net-core - 从身份服务器验证 JWT 令牌 4
- r - R. 将字符类型的向量转换为字符串
- android - Android MediaRecorder Video - 随后播放的音频和视频轨道
- scala - Scala 会支持从调用者而不是从定义中检测隐式参数吗?
- python - Pandas 转换为 csv 时与 csv 日期时间不匹配