首页 > 解决方案 > 如果特定任务失败,是否可以终止流程?

问题描述

我的流程很长,我想将其状态标记为Failed特定任务失败。

我在这里阅读了文档,但是他们似乎没有指定这种情况。我的猜测是我需要在@task()装饰器中以某种方式具体说明这一点,但我不知道如何。

非常感谢任何指导。

标签: etlprefect

解决方案


这可以通过将您的特定任务声明为 Flow 的“参考任务”来实现。 参考任务是最终确定每个流程运行的最终状态的任务。

例如,以下代码片段创建了一个流程,其中包含两个独立的、不相关的任务,这些任务随机失败了一半。但是,整体流运行状态将仅根据 的状态确定task_one,因为我们将该任务指定为唯一的参考任务:

import random
from prefect import task, Flow


@task
def task_one():
    if random.random() > 0.5:
        raise ValueError("Random failure")

@task
def task_two():
    if random.random() > 0.5:
        raise ValueError("Random failure")


flow = Flow("Two Task Flow", tasks=[task_one, task_two])
flow.set_reference_tasks([task_one])

推荐阅读