首页 > 解决方案 > 完美的工作流程:如何保留以前/每个计划运行的数据?

问题描述

在完美的工作流程中,我试图保留每个计划运行的数据。我需要比较每个以前和当前结果的数据。我尝试了 Localresult 和 checkpoint=true 但它不起作用。例如,

from prefect import Flow, task
from prefect.engine.results import LocalResult
from prefect.schedules import IntervalSchedule
from datetime import timedelta, datetime
import os
import prefect

@task("func_task_target.txt", checkpoint=True, result=LocalResult(dir="~/.prefect"))
def file_scan():
    files = os.listdir(test) 
    #prefect.context.a = files
    return files

schedule = IntervalSchedule(interval=timedelta(seconds=61))

with Flow("Test persist data", schedule) as flow:
    a = file_scan()
flow.run()

我的流程安排为每 61 秒/每分钟一次。在第一次运行时,我可能会得到空结果,但对于第二次计划运行,我应该得到以前的流结果进行比较。谁能帮我实现这一目标?谢谢!

标签: pythonprefect

解决方案


更新(2021 年 11 月 15 日):

不知道是什么原因, LocalResult当我通过仪表板或 cli运行注册流程checkpoint时实际工作。当我在 python 代码中手动触发流程(例如:)时,它不起作用。prefect run -n "singtel_postpaid.py" --watchflow.run


尝试以下两个选项:

选项 1:使用target参数:

https://docs.prefect.io/core/concepts/persistence.html#output-caching-based-on-a-file-target

@task(target="func_task_target.txt", checkpoint=True, result=LocalResult(dir="~/.prefect"))
def func_task():
    return "999"

选项 2:实例化实例并手动LocalResult调用。write

MY_RESULTS = LocalResult(dir="./.prefect"). 

@task(checkpoint=True, result=LocalResult(dir="./.prefect"))
def func_task():
    MY_RESULTS.write("999")
    return "999"

PS:

在 decorator 中使用时,遇到与我相同的问题LocalResult似乎对我不起作用。例如:

@task("func_task_target.txt", checkpoint=True, result=LocalResult(dir="~/.prefect"))
def file_scan():

推荐阅读