首页 > 解决方案 > 回填 dag 运行不会显示在 UI 中

问题描述

我有一个 2018 年开始的项目,但我会定期添加一些新的 DAG。因此,我所有的 DAGstart_date在 2018 年schedule_interval每天都有一个,但catch_up设置为 False,因为当我现在添加一个新的 DAG 时,我不希望它自 2018 年以来每天都运行(现在,也许我必须运行它这些天来)。

但是,大多数时候,我希望它在我添加它的日期之前运行几周。我预计 dag 在start_date和之间运行added_date(我添加 dag 的日期)在 DAG 树视图 UI 中显示为白色圆圈,因此,我可以在过去两周手动触发它。但是在这个视图中什么都没有出现......

因此,我手动运行回填(从命令行... UI 中的回填界面会很好),但回填执行的所有运行都不会出现在 UI 中。因此,如果一次回填运行失败,我仍然无法从 UI 重新运行它。

start_date是“没有显示和之间可能的 dag-runs added_date”。气流的预期行为?有什么办法可以克服这个吗?或者有没有更好的方法来处理这个用例:“添加一个 DAG,并在过去的某些日期手动运行它。”


[编辑] 程序化 dagrun 失败

正如Philipp所提出的,一个解决方案可以是打开追赶,并将所有运行标记start_dateadd_date成功(或失败,等等)。

我最终得到了这样的结果:

import datetime

from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.utils.state import State


start_date = datetime.datetime(2020, 10, 1, tzinfo=datetime.timezone.utc)
add_date = datetime.datetime.now(datetime.timezone.utc)

# Define a DAG with just one task
with DAG("test_catchup", schedule_interval="@daily", catchup=True, start_date=start_date) as dag:
    BashOperator(
            task_id="print",
            bash_command="echo {{ ds }}"
            )

    # For all dates between start_date and add_date, create a dagrun and mark it as failed.
    for d in [start_date + datetime.timedelta(n) for n in range(int((add_date - start_date).days))]:
        print("Create dagrun for ", d.isoformat())
        try:
            dag.create_dagrun(run_id="programmatic_fill_{}".format(d.isoformat()), state=State.FAILED, execution_date=d, start_date=add_date, external_trigger=False)
        except Exception as e:
            print("Error:", e)
            pass

如您所见,首先,您必须将 dagrun 创建嵌套在 try-except 块中,因为每次 Airflow 读取此文件时,它都会尝试在 dagrun 数据库中添加相同的条目,并因一些主键冲突而失败。

这大致有效。我所有的 dagruns 都出现了:

出现在 UI 中的所有 dagruns

但是,我不能(重新)运行它们中的任何一个。在清除一个时,我收到以下错误:

没有要清除的任务实例

我设法将一个标记为成功(将圆圈和正方形变为绿色),然后清除它,将 DAG(圆圈)变为运行状态,但将任务变为None状态并且它永远不会执行......


[编辑] 仅最新的运算符

Philipp的另一个好主意,我试了一下LatestOnlyOperator

import datetime

from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.utils.state import State
from airflow.operators.latest_only_operator import LatestOnlyOperator


start_date = datetime.datetime(2020, 10, 1)
with DAG("test_latest_only", schedule_interval="@daily", catchup=True, start_date=start_date) as dag:
    LatestOnlyOperator(
            task_id="latest_filter"
    ) >> BashOperator(
            task_id="print",
            bash_command="echo {{ ds }}"
    )

结果(我已经手动重新运行了第一个 dagrun):

最新和任意过去的 dag 运行成功

优点:

缺点:

最后,在选项可用之前,这个运算符似乎是一个老把戏catchup=False,我不确定它的可持续性,因为已经讨论过要弃用它。

标签: airflowairflow-scheduler

解决方案


我终于找到了一个合适的解决方案,灵感来自“程序化 DAG 运行”的想法。

错误消息指出存在No task instances to clear. 因此,解决方案是在创建 DAG 运行时创建任务实例。

这是我的工作解决方案:

import datetime

from airflow import DAG
from airflow.models import TaskInstance
from airflow.operators.bash_operator import BashOperator
from airflow.utils.state import State
from airflow.utils.timezone import parse

start_date = parse(datetime.datetime(2020, 11, 1).isoformat())
add_date = datetime.datetime(2020, 11, 8, tzinfo=datetime.timezone.utc)

# Create your dag
with DAG("test_task_instance", schedule_interval="@daily", catchup=False, start_date=start_date) as dag:
    # And its tasks
    BashOperator(
            task_id="print",
            bash_command="echo '{{ ti }}' > /tmp/{{ ds }}"
            )

    # For each expected execution date
    for d in dag.date_range(dag.start_date, end_date=add_date)[:-1]:
        # If no DAG run already exists
        if dag.get_dagrun(d) is None:
            # Create a new one (marking the state as SKIPPED)
            dagrun = dag.create_dagrun(run_id="programmatic_fill_{}".format(d.isoformat()),
                                       state=State.SKIPPED,
                                       execution_date=d,
                                       start_date=add_date,
                                       external_trigger=False)
            # And create task instance for each task for this DAG run
            for t in dag.tasks:
                TaskInstance(task=t, execution_date=dagrun.execution_date)

如您所见,try...except在创建新块之前不再需要我们要求的块。一个SKIPPED状态似乎更适合这些 DAG 运行,因为实际上没有运行任何东西。

然后,在气流 UI 中查看 DAG 时。

所有 dag 都标记为失败,任务没有状态

最后,您可以通过清除它并让调度程序完成它的工作来挑选要运行的 dag。

调度程序排队的任务

选定的 DAG 已成功运行


编辑:

我编辑了下面取代调度程序的代码,在调度程序制作它们之前创建了“catch up” dags。

当前的解决方案(在我看来并不是最好的)是放弃最后的预期执行日期。

- for d in dag.date_range(dag.start_date, end_date=add_date):
+ for d in dag.date_range(dag.start_date, end_date=add_date)[:-1]:

推荐阅读