首页 > 解决方案 > 气流如何从谷歌云平台上的 dags 主文件夹加载/更新 DagBag?

问题描述

请不要对我的回答投反对票。如果需要,我会更新并更正我的话。我已经完成了我的家庭作业研究。我有点新,所以试图理解这一点。

我想了解 Google 云平台上的气流如何获取从 dags 主文件夹到 UI 的更改。另外请帮助我使用我的 dags 设置脚本。我已经阅读了很多答案以及书籍。书链接在这里

我试着从第 69 页找出我的答案,上面写着

3.11 调度和触发器 Airflow 调度器监控所有任务和所有 DAG,并触发已满足依赖关系的任务实例。在幕后,它监视文件夹并与它可能包含的所有 DAG 对象保持同步,并定期(每分钟左右)检查活动任务以查看它们是否可以被触发。

我对这本书的理解是调度程序会定期从 dags 主文件夹中获取更改。(这是对的吗?)

我还阅读了有关堆栈溢出的多个答案,我发现这个有用的 链接

但是答案仍然不包含从 dag 主文件夹中的 script.py 创建/更新 dagbag 的过程。如何感知变化。

请帮助我完成我的 dags 设置脚本。我们创建了一个通用的 python 脚本,它通过读取/迭代配置文件来动态创建 dag。

下面是目录结构

/dags/workflow/
/dags/workflow/config/dag_a.json
/dags/workflow/config/dag_b.json
/dags/workflow/task_a_with_single_operator.py
/dags/workflow/task_b_with_single_operator.py
/dags/dag_creater.py

执行流程 dag_creater.py 如下:-

 1. Iterate in dags/workflow/config folder get the Config JSON file and
    read variable dag_id.
 2. create Parent_dag = DAG(dag_id=dag_id,
    start_date=start_date, schedule_interval=schedule_interval,
                             default_args=default_args, catchup=False) 
 3. Read tasks with dependencies of that dag_id from config json file
    (example :- [[a,[]],[b,[a]],[c,[b]]]) and code it as task_a >>
    task_b >> task_c

这样就创建了 dag。一切正常。Dags 在 UI 上也可见并且运行良好。

但问题是,我的 dag 创建脚本每次都在运行。即使在每个任务日志中,我也会看到所有 dag 的日志。我希望这个脚本运行一次。只是为了填写元数据条目。我无法理解为什么它每次都在运行。请让我理解这个过程。

我知道一旦我们第一次设置元数据,airflow initdb 就会运行。所以这并不是一直在做这个更新。

请注意:我无法输入真实代码,因为这是我所在组织的限制。但是,如果被问到,我会提供更多信息。

标签: google-cloud-platformairflowairflow-scheduler

解决方案


Airflow Scheduler实际上在 Airflow 运行时环境中持续运行,作为监视 DAG 文件夹中的更改并触发驻留在该文件夹中的相关 DAG 任务的主要贡献者。Airflow Scheduler 服务的主要设置可以在文件中找到,主要是有效影响一般 DAG 任务维护airflow.cfg的心跳间隔。

但是,特定任务的执行方式是根据Airflow 配置中的Executor模型定义的。

为了存储可用于 Airflow 运行时环境的 DAG,GCP Composer 使用 Cloud Storage,实现特定的文件夹结构,同步任何到达/dags带有*.py扩展名的文件夹的对象,以验证它是否包含 DAG定义

如果您希望在 Airflow 运行时运行 DAG 传播脚本,那么在这个特定用例中,我建议您查看PythonOperator,在单独的 DAG 中使用它来调用和执行您的自定义通用 Python 代码,并保证一次只调度一次. 您可以查看这个 Stack线程以及实现细节。


推荐阅读