python - 在完美任务“批处理”中使用迭代器
问题描述
我正在使用prefect并定义一个流程来使用 cosmos db 插入文档。
问题是query_items()调用是可迭代的,对于大型容器,没有办法将所有条目保存在内存中。
我相信我的问题可以简化为:
given an iterator, how can I create batches to be processed (mapped) in a prefect flow?
例子:
def big_iterable_function_i_cannot_change():
yield from range(1000000) # some large amount of work
@task
def some_prefect_batching_magic(x):
# magic code here
pass
with Flow("needs-to-be-batched"):
some_prefect_batching_magic.map(big_iterable_function_i_cannot_change())
上面的代码或类似的代码会给我一个错误:
prefect.FlowRunner | Flow run FAILED: some reference tasks failed.
解决方案
您收到此错误是因为您没有定义big_iterable_function_i_cannot_change
为task
. prefect
实际上并没有flow
直接执行 a。flow
用于制作schedule
, (用说法dask
)——然后用于执行流程(据我所知)。仅当与dask executorprefect
一起使用时才会发生并行化。
这是我对你的看法flow
。但是,如果您无法将任务装饰器添加big_iterable_function_i_cannot_change
到 atask
中,请将其包装在任务中。最后 - 不确定您是否可以将生成器传递给映射任务。
import prefect
from prefect import Flow, Parameter, task
@task
def big_iterable_function_i_cannot_change():
return range(5) # some large amount of work
@task
def some_prefect_batching_magic(x):
# magic code here
pass
with Flow("needs-to-be-batched") as flow:
itter_res = big_iterable_function_i_cannot_change()
post_process_res = some_prefect_batching_magic.map(itter_res)
flow.visualize()
state = flow.run()
flow.visualize(flow_state=state)
推荐阅读
- unity3d - 如何统一重放声音?
- python - 使用 for 循环和字符串中的“索引”捆绑数组中的值
- javascript - 如何在 react.js 中为变量名添加另一个变量值
- node.js - 创建一个在服务器和客户端之间共享的私有 JS 模块
- android - Android Studio 中渲染问题的原因
- android - 在 Android 中的 Post 方法中缺少授权类型
- flutter - 颤振删除列中小部件之间的空间
- ssl-certificate - NGINX 上的 SSL 证书 - 是否需要结合两个 .crt 文件?
- github - 从 Github API 获取特定的 README.md 数据
- r - min max 给出 NA 值 R dplyr