首页 > 解决方案 > 外部任务传感器 execution_date_fn

问题描述

我想在我的 dag 的外部任务传感器中使用 execution_date_fn 以使其依赖于先前的实例(我不想使用 depends_on_past 参数)。有人可以告诉我如何使用“execution_date_fn”获取同一 dag 的先前执行 ID,这样我就不必在“execution_delta”中指定小时/分钟。不确定以下方法“prev_execution_date_1”是否正确。感谢您对此的任何帮助。

def prev_execution_date_1(**kwargs):
    dr = self.get_dagrun(session=session)
    previous_scheduled_date = dr.previous_schedule(self.execution_date)
    return previous_scheduled_date


    external_0 = ExternalTaskSensor(
            task_id='Check_Previous_Instance',
            external_task_id=None,
            external_dag_id='dag_abc_1',
            allowed_states=['success'],
            execution_date_fn=prev_execution_date_1,
            dag=dag
    )

标签: dependenciestaskairflowsensorsdirected-acyclic-graphs

解决方案


execution_date_fn 用于在未通过 execution_delta 的情况下根据当前执行日期计算所需的执行日期,在当前稳定版本 1.10 中,有参数检查,它最多接受 2 个 args,context['execution_date'] 和 context。并且上下文不包含会话,因此您无法在其中查询数据库。您可以尝试从airflow.settings 导入会话,但这不是一个好主意。

一个例子是:

lambda dt: dt + timedelta(days=1)

如果您的调度程序间隔不经常更改,则使用 execution_delta 是一种相对简单的方法。


推荐阅读