airflow - 将标记设置为成功并强制所有下游任务运行
问题描述
我们需要暂停流程,直到客户确认他对数据没问题,然后继续流程。
我们是怎么做的:准备带有pdf附件的电子邮件,发送给客户进行验证,如果他同意pdf正确完成,那么客户将恢复流程运行。
我们执行它的方式是暂停流程,并尝试重新启动是通过分别有两个并行任务“send_validation_email_pdf”和“user_validation”。我们将任务“user_validation”设置为“故意”失败。同时,任务“send_validation_email_pdf”正在发送带有链接的pdf文档,该链接允许客户端将任务“user_validation”状态设置为“标记为成功”。
超链接示例:
我们希望此任务设置为成功并恢复整个流程。但是,仅将此任务标记为“成功”是不够的。原因是下一个任务仍然保持相同的状态=“upstream_failed”,不会重新运行。
我试图在名为“fin_send_email_validation”的下一个任务中包含以下参数:trigger_rule=TriggerRule.ALL_SUCCESS——但它没有用。还尝试在“fin_send_email_validation”中使用参数以下参数“depends_on_past=True”。也没有工作。
有人对如何暂停流程有更好的想法,直到客户确认他对数据没问题,然后继续流程?或者有人可以给我一个关于如何解除我已经尝试做的事情的想法。
from airflow.operators.python_operator import PythonOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow.utils.trigger_rule import TriggerRule
send_validation_email_pdf = PythonOperator(
task_id="send_validation_email_pdf",
provide_context=True,
python_callable=set_send_validation_email_pdf,
dag=dag,
)
user_validation = PythonOperator(
task_id="user_validation",
retries=0,
email_on_failure=False,
python_callable=user_validation,
dag=dag,
)
fin_send_email_validation = DummyOperator(task_id="fin_send_email_validation", trigger_rule=TriggerRule.ALL_SUCCESS,
depends_on_past=True, dag=dag)
fin_refresh_TDE >> send_validation_email_pdf >> fin_send_email_validation
fin_refresh_TDE >> user_validation >> fin_send_email_validation```
Pause process flow, until the client confirmed he is OK with the data, then continue the flow.
解决方案
有一个只是休眠的任务,具有适当的 soft_fail 超时延迟。满意后,只需将其标记为成功,其余工作流程应继续进行。
def user_validation():
time.sleep(86350)
user_validation = PythonOperator(
task_id="user_validation",
retries=0,
email_on_failure=False,
soft_fail=True,
timeout=86400,
python_callable=user_validation,
dag=dag,
)
推荐阅读
- c# - 如何通过 InvokeHelper 将 variant* 传递给 C#?
- java - Servet - 为什么我得到空指针异常?
- javascript - 在导航上刷新 Vue 组件
- c++ - 按钮不更新
- weblogic - weblogic.rmi.extensions.DisconnectMonitorUnavailableException:无法为 [null] 注册 DisconnectListener
- javascript - 在延迟到期之前调用 JavaScript setTimeout 回调
- javascript - 为什么我们在前端框架中使用 ES6 模块而不是 CommonJS 进行导入
- r - 安装包和调用库成功后boxM函数出错
- c# - C# NUnit 和测试元组
- python-3.x - 使用方法链处理异常