首页 > 解决方案 > 将标记设置为成功并强制所有下游任务运行

问题描述

我们需要暂停流程,直到客户确认他对数据没问题,然后继续流程。

我们是怎么做的:准备带有pdf附件的电子邮件,发送给客户进行验证,如果他同意pdf正确完成,那么客户将恢复流程运行。

我们执行它的方式是暂停流程,并尝试重新启动是通过分别有两个并行任务“send_validation_email_pdf”和“user_validation”。我们将任务“user_validation”设置为“故意”失败。同时,任务“send_validation_email_pdf”正在发送带有链接的pdf文档,该链接允许客户端将任务“user_validation”状态设置为“标记为成功”。

超链接示例:

http://localhost:8080/admin/airflow/success?task_id=user_validation&dag_id=rf.duree&upstream=false&downstream=false&future=false&past=false&execution_date=2019-05-24T00%3A00%3A00%2B00%3A00&origin=http%3A%2F% 2Flocalhost%3A8080%2Fadmin%2Fairflow%2Ftree%3Fdag_id%3Drf.duree&confirmed=true

我们希望此任务设置为成功并恢复整个流程。但是,仅将此任务标记为“成功”是不够的。原因是下一个任务仍然保持相同的状态=“upstream_failed”,不会重新运行。

我试图在名为“fin_send_email_validation”的下一个任务中包含以下参数:trigger_rule=TriggerRule.ALL_SUCCESS——但它没有用。还尝试在“fin_send_email_validation”中使用参数以下参数“depends_on_past=True”。也没有工作。

有人对如何暂停流程有更好的想法,直到客户确认他对数据没问题,然后继续流程?或者有人可以给我一个关于如何解除我已经尝试做的事情的想法。

from airflow.operators.python_operator import PythonOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow.utils.trigger_rule import TriggerRule

send_validation_email_pdf = PythonOperator(
    task_id="send_validation_email_pdf",
    provide_context=True,
    python_callable=set_send_validation_email_pdf,
    dag=dag,
)

user_validation = PythonOperator(
    task_id="user_validation",
    retries=0,
    email_on_failure=False,
    python_callable=user_validation,
    dag=dag,
)

fin_send_email_validation = DummyOperator(task_id="fin_send_email_validation", trigger_rule=TriggerRule.ALL_SUCCESS,
                                          depends_on_past=True, dag=dag)

fin_refresh_TDE >> send_validation_email_pdf >> fin_send_email_validation
fin_refresh_TDE >> user_validation >> fin_send_email_validation```


Pause process flow, until the client confirmed he is OK with the data, then continue the flow.

标签: airflowairflow-scheduler

解决方案


有一个只是休眠的任务,具有适当的 soft_fail 超时延迟。满意后,只需将其标记为成功,其余工作流程应继续进行。

def user_validation():
    time.sleep(86350)


user_validation = PythonOperator(
        task_id="user_validation",
        retries=0,
        email_on_failure=False,
        soft_fail=True,
        timeout=86400,
        python_callable=user_validation,
        dag=dag,
    )

推荐阅读