首页 > 解决方案 > 气流 | DAG 是如何开始的

问题描述

有谁知道如何获得 DAG 的启动方式(无论是在调度程序上还是手动)?我正在使用气流 2.1。

我有一个每小时运行一次的 DAG,但有时我会手动运行它来测试一些东西。我想捕获 DAG 是如何启动的,并将该值传递给我正在保存一些数据的表中的列。这将允许我根据计划或手动启动进行过滤并过滤测试信息。

谢谢!

标签: airflow

解决方案


从执行上下文中,例如python_callable提供给PythonOperator您可以访问与当前执行相关的DagRun对象:

def _print_dag_run(**kwargs):
    dag_run: DagRun = kwargs["dag_run"]
    print(f"Run type: {dag_run.run_type}")
    print(f"Externally triggered ?: {dag_run.external_trigger}")

日志输出:

[2021-09-08 18:53:52,188] {taskinstance.py:1300} INFO - Exporting the following env vars:
AIRFLOW_CTX_DAG_OWNER=airflow
AIRFLOW_CTX_DAG_ID=example_dagRun_info
AIRFLOW_CTX_TASK_ID=python_task
AIRFLOW_CTX_EXECUTION_DATE=2021-09-07T00:00:00+00:00
AIRFLOW_CTX_DAG_RUN_ID=backfill__2021-09-07T00:00:00+00:00
Run type: backfill
Externally triggered ?: False

dag_run.run_type将是:“手动”、“计划”或“回填”。(不知道有没有其他人)

external_trigger文档:

external_trigger (bool) -- 这个 dag 运行是否被外部触发

您也可以使用 jinja 访问模板字段中的默认变量,有一个代表dag_run对象的变量:

    bash_task = BashOperator(
        task_id="bash_task",
        bash_command="echo dag_run type is: {{ dag_run.run_type }}",
    )

完整的 DAG:

from airflow import DAG
from airflow.models.dagrun import DagRun
from airflow.operators.python import PythonOperator
from airflow.utils.dates import days_ago


default_args = {
    "owner": "airflow",
}

def _print_dag_run(**kwargs):
    dag_run: DagRun = kwargs["dag_run"]

    print(f"Run type: {dag_run.run_type}")
    print(f"Externally triggered ?: {dag_run.external_trigger}")


dag = DAG(
    dag_id="example_dagRun_info",
    default_args=default_args,
    start_date=days_ago(1),
    schedule_interval="@once",
    tags=["example_dags", "params"],
    catchup=False,
)
with dag:

    python_task = PythonOperator(
        task_id="python_task",
        python_callable=_print_dag_run,
    )

    bash_task = BashOperator(
        task_id="bash_task",
        bash_command="echo dag_run type is: {{ dag_run.run_type }}",
    )


推荐阅读