airflow - 回填 dag 运行不会显示在 UI 中
问题描述
我有一个 2018 年开始的项目,但我会定期添加一些新的 DAG。因此,我所有的 DAGstart_date
在 2018 年schedule_interval
每天都有一个,但catch_up
设置为 False,因为当我现在添加一个新的 DAG 时,我不希望它自 2018 年以来每天都运行(现在,也许我必须运行它这些天来)。
但是,大多数时候,我希望它在我添加它的日期之前运行几周。我预计 dag 在start_date
和之间运行added_date
(我添加 dag 的日期)在 DAG 树视图 UI 中显示为白色圆圈,因此,我可以在过去两周手动触发它。但是在这个视图中什么都没有出现......
因此,我手动运行回填(从命令行... UI 中的回填界面会很好),但回填执行的所有运行都不会出现在 UI 中。因此,如果一次回填运行失败,我仍然无法从 UI 重新运行它。
start_date
是“没有显示和之间可能的 dag-runs added_date
”。气流的预期行为?有什么办法可以克服这个吗?或者有没有更好的方法来处理这个用例:“添加一个 DAG,并在过去的某些日期手动运行它。”
[编辑] 程序化 dagrun 失败
正如Philipp所提出的,一个解决方案可以是打开追赶,并将所有运行标记start_date
为add_date
成功(或失败,等等)。
我最终得到了这样的结果:
import datetime
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.utils.state import State
start_date = datetime.datetime(2020, 10, 1, tzinfo=datetime.timezone.utc)
add_date = datetime.datetime.now(datetime.timezone.utc)
# Define a DAG with just one task
with DAG("test_catchup", schedule_interval="@daily", catchup=True, start_date=start_date) as dag:
BashOperator(
task_id="print",
bash_command="echo {{ ds }}"
)
# For all dates between start_date and add_date, create a dagrun and mark it as failed.
for d in [start_date + datetime.timedelta(n) for n in range(int((add_date - start_date).days))]:
print("Create dagrun for ", d.isoformat())
try:
dag.create_dagrun(run_id="programmatic_fill_{}".format(d.isoformat()), state=State.FAILED, execution_date=d, start_date=add_date, external_trigger=False)
except Exception as e:
print("Error:", e)
pass
如您所见,首先,您必须将 dagrun 创建嵌套在 try-except 块中,因为每次 Airflow 读取此文件时,它都会尝试在 dagrun 数据库中添加相同的条目,并因一些主键冲突而失败。
这大致有效。我所有的 dagruns 都出现了:
但是,我不能(重新)运行它们中的任何一个。在清除一个时,我收到以下错误:
没有要清除的任务实例
我设法将一个标记为成功(将圆圈和正方形变为绿色),然后清除它,将 DAG(圆圈)变为运行状态,但将任务变为None
状态并且它永远不会执行......
[编辑] 仅最新的运算符
从Philipp的另一个好主意,我试了一下LatestOnlyOperator
。
import datetime
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.utils.state import State
from airflow.operators.latest_only_operator import LatestOnlyOperator
start_date = datetime.datetime(2020, 10, 1)
with DAG("test_latest_only", schedule_interval="@daily", catchup=True, start_date=start_date) as dag:
LatestOnlyOperator(
task_id="latest_filter"
) >> BashOperator(
task_id="print",
bash_command="echo {{ ds }}"
)
结果(我已经手动重新运行了第一个 dagrun):
优点:
- 它实现了我尝试做的事情
缺点:
- 它需要一个操作员
- 引导很慢。运行我的 12 个 dags 大约需要 5 分钟,而只是停止第一个任务(如果我用它来“回填”2 年的日常工作?)
- 您不能清除 DAG,而只能清除 LatestOnlyOperator 下的第一个任务,否则它将继续阻止下游任务的执行。
最后,在选项可用之前,这个运算符似乎是一个老把戏catchup=False
,我不确定它的可持续性,因为已经讨论过要弃用它。
解决方案
我终于找到了一个合适的解决方案,灵感来自“程序化 DAG 运行”的想法。
错误消息指出存在No task instances to clear
. 因此,解决方案是在创建 DAG 运行时创建任务实例。
这是我的工作解决方案:
import datetime
from airflow import DAG
from airflow.models import TaskInstance
from airflow.operators.bash_operator import BashOperator
from airflow.utils.state import State
from airflow.utils.timezone import parse
start_date = parse(datetime.datetime(2020, 11, 1).isoformat())
add_date = datetime.datetime(2020, 11, 8, tzinfo=datetime.timezone.utc)
# Create your dag
with DAG("test_task_instance", schedule_interval="@daily", catchup=False, start_date=start_date) as dag:
# And its tasks
BashOperator(
task_id="print",
bash_command="echo '{{ ti }}' > /tmp/{{ ds }}"
)
# For each expected execution date
for d in dag.date_range(dag.start_date, end_date=add_date)[:-1]:
# If no DAG run already exists
if dag.get_dagrun(d) is None:
# Create a new one (marking the state as SKIPPED)
dagrun = dag.create_dagrun(run_id="programmatic_fill_{}".format(d.isoformat()),
state=State.SKIPPED,
execution_date=d,
start_date=add_date,
external_trigger=False)
# And create task instance for each task for this DAG run
for t in dag.tasks:
TaskInstance(task=t, execution_date=dagrun.execution_date)
如您所见,try...except
在创建新块之前不再需要我们要求的块。一个SKIPPED
状态似乎更适合这些 DAG 运行,因为实际上没有运行任何东西。
然后,在气流 UI 中查看 DAG 时。
最后,您可以通过清除它并让调度程序完成它的工作来挑选要运行的 dag。
编辑:
我编辑了下面取代调度程序的代码,在调度程序制作它们之前创建了“catch up” dags。
当前的解决方案(在我看来并不是最好的)是放弃最后的预期执行日期。
- for d in dag.date_range(dag.start_date, end_date=add_date):
+ for d in dag.date_range(dag.start_date, end_date=add_date)[:-1]:
推荐阅读
- json - 如何将 JSON 数组转换为 Oracle 中的一组行?
- python - Python:处理 None 值并可在循环中使用的 Attrgetter
- javascript - 在 Android 应用内浏览器中显示(安装)PWA 横幅不起作用
- vuetify.js - 如何在 Vuetify Navigation Drawer 的右侧添加标签
- mysql - 防止在 CMD 中为 MySQL 选择命令覆盖数据
- python - 有什么方法可以将 chrome 无头模式制作的 PDF 存储在 RAM 中?
- list - 如何在 Kotlin 中并行迭代两个列表?
- python-3.x - 将文本格式的电子邮件合并到一个 csv 文件中以进行机器学习
- html - 在 Angular 中使用 HTTP 获取数据
- c# - 如何将 ASP.NET Core 3.1 托管为虚拟服务帐户?