dependencies - 外部任务传感器 execution_date_fn
问题描述
我想在我的 dag 的外部任务传感器中使用 execution_date_fn 以使其依赖于先前的实例(我不想使用 depends_on_past 参数)。有人可以告诉我如何使用“execution_date_fn”获取同一 dag 的先前执行 ID,这样我就不必在“execution_delta”中指定小时/分钟。不确定以下方法“prev_execution_date_1”是否正确。感谢您对此的任何帮助。
def prev_execution_date_1(**kwargs):
dr = self.get_dagrun(session=session)
previous_scheduled_date = dr.previous_schedule(self.execution_date)
return previous_scheduled_date
external_0 = ExternalTaskSensor(
task_id='Check_Previous_Instance',
external_task_id=None,
external_dag_id='dag_abc_1',
allowed_states=['success'],
execution_date_fn=prev_execution_date_1,
dag=dag
)
解决方案
execution_date_fn 用于在未通过 execution_delta 的情况下根据当前执行日期计算所需的执行日期,在当前稳定版本 1.10 中,有参数检查,它最多接受 2 个 args,context['execution_date'] 和 context。并且上下文不包含会话,因此您无法在其中查询数据库。您可以尝试从airflow.settings 导入会话,但这不是一个好主意。
一个例子是:
lambda dt: dt + timedelta(days=1)
如果您的调度程序间隔不经常更改,则使用 execution_delta 是一种相对简单的方法。
推荐阅读
- jquery - I want to know why an error occurred when I sent an array to view with jquery ajax (django,jquery)
- firebase - What is the best practice for a multi-tenant app with Collection Group Queries in Firestore?
- algorithm - Solving a recurrence relation for a recursive function
- abap - Method importing parameter type range of any
- php - 为什么 finfo_buffer() 返回的 mime-type 与 mime_content_type() 不同?
- flutter - 未按预期从等待函数返回未来
- r - 没有填充的堆积条形图?
- python - 如何在新模型创建过程中获取 ForeignKey 字段值?
- javascript - 为什么我的密码验证码不起作用?
- c# - 如何修复 System.Xml.Linq.XContainer.Element(...) 返回 null