python - 完美的工作流程:如何保留以前/每个计划运行的数据?
问题描述
在完美的工作流程中,我试图保留每个计划运行的数据。我需要比较每个以前和当前结果的数据。我尝试了 Localresult 和 checkpoint=true 但它不起作用。例如,
from prefect import Flow, task
from prefect.engine.results import LocalResult
from prefect.schedules import IntervalSchedule
from datetime import timedelta, datetime
import os
import prefect
@task("func_task_target.txt", checkpoint=True, result=LocalResult(dir="~/.prefect"))
def file_scan():
files = os.listdir(test)
#prefect.context.a = files
return files
schedule = IntervalSchedule(interval=timedelta(seconds=61))
with Flow("Test persist data", schedule) as flow:
a = file_scan()
flow.run()
我的流程安排为每 61 秒/每分钟一次。在第一次运行时,我可能会得到空结果,但对于第二次计划运行,我应该得到以前的流结果进行比较。谁能帮我实现这一目标?谢谢!
解决方案
更新(2021 年 11 月 15 日):
不知道是什么原因,
LocalResult
当我通过仪表板或 cli运行注册流程checkpoint
时实际工作。当我在 python 代码中手动触发流程(例如:)时,它不起作用。prefect run -n "singtel_postpaid.py" --watch
flow.run
尝试以下两个选项:
选项 1:使用target
参数:
https://docs.prefect.io/core/concepts/persistence.html#output-caching-based-on-a-file-target
@task(target="func_task_target.txt", checkpoint=True, result=LocalResult(dir="~/.prefect"))
def func_task():
return "999"
选项 2:实例化实例并手动LocalResult
调用。write
MY_RESULTS = LocalResult(dir="./.prefect").
@task(checkpoint=True, result=LocalResult(dir="./.prefect"))
def func_task():
MY_RESULTS.write("999")
return "999"
PS:
在 decorator 中使用时,遇到与我相同的问题LocalResult
似乎对我不起作用。例如:
@task("func_task_target.txt", checkpoint=True, result=LocalResult(dir="~/.prefect"))
def file_scan():
推荐阅读
- google-cloud-firestore - Firestore 安全规则 - 使用自定义声明
- java - 运行时实例化全局可访问列表
- android - 如何修复此错误“找不到 android.room:room-compiler:2.2.4。”
- c - 显示大于 n = 5 的费马数时出现问题
- c# - 从 Azure 函数调用存储过程时,“System.Data.SqlClient.TdsParser”的类型初始化程序引发异常错误
- android - Android Studio - 为什么我的回收器视图卡住而不让我滚动到它的底部?
- kubernetes - 如何在 Kubernetes 中检查外部指标数据?
- python - 在列表的情况下共享引用有问题
- php - 重写列出 MySQL 表的函数
- android - 使用 EncryptedSharedPreferences 和 Biometric 在 Android 上保存敏感数据