google-cloud-platform - 如果我导入 google.cloud.storage,则无法在 Google Composer 上部署 DAG
问题描述
出于某种原因,如果我在 DAG 中导入 google.cloud.storage,我无法在 Google Composer 上部署 DAG 文件。如果我尝试部署这样的 DAG 文件,那么它不会被添加到 DagBag 中,因此最终会在 Airflow 网站中出现一个非链接条目并且不可用。此时,通常的信息图标显示:此 DAG 在 Web 服务器的 DagBag 对象中不可用。它显示在此列表中是因为调度程序在元数据数据库中将其标记为活动。与实际的语法错误不同,页面顶部没有错误消息。
关于我是否导入 google.cloud.storage,我已经将其准确归结为。甚至我是否真的使用它。例如,如果我注释掉存储导入行,这个 dag 可以正常工作,如果我替换它,它不会安装在 Composer 中。有人会知道为什么吗?
import datetime
from airflow import DAG
from google.cloud import storage
from airflow.operators.python_operator import PythonOperator
default_args = {
'owner': 'Airflow',
'depends_on_past': False,
'email': ['kevin@here.com'],
'email_on_failure': True,
'email_on_retry': True,
'retries': 1,
'retry_delay': datetime.timedelta(minutes=5),
'start_date': datetime.datetime(2017,1,1),
}
def ingest_file(**kwargs):
status = 'OK'
return status
# Not scheduled, trigger only
dag = DAG('ingest_test', default_args=default_args, schedule_interval=None)
ingest = PythonOperator(task_id = 'ingest', provide_context = True,
python_callable = ingest_file, dag = dag)
解决方案
如果您在 DAG 或自定义 Operator 中需要 PyPi 包,那么您不会收到错误消息,DAG 只是没有部署。如果你得到这个,那么确保你需要的所有包都安装在 Composer 环境中。
请注意,存在然后不存在的行为仍然存在,但实际上会在片刻后安定下来
推荐阅读
- python - Django 将普通查询集转换为迭代器
- scala - SBT 插件:传播编译器插件并在下游启用插件
- html - mailto 未打开 gmail 撰写选项卡
- python - 将 gTT 导入 Python 问题
- apache-spark - 使用提供的 Hadoop 构建 Spark
- javascript - Expressjs 同步/异步中间件问题——如何解决?
- typescript - 有没有一种方法可以键入一个函数,该函数通过名称从 Typescript 中的对象中删除属性,同时保持类型安全(不进行类型转换)?
- react-native - 使用 React Native 上的样式化组件级联文本颜色
- c# - C#属性检查是一个值等于构造函数参数并获取构造函数值
- reporting-services - 在 Microsoft Visual Studio IDE 中生成 SSRS 项目时,我在哪里可以找到并设置 .rdl 文件的报告名称?