python - Dask 诊断 - 带有 map_partition / 延迟的进度条
问题描述
我正在使用分布式调度程序和分布式进度条。有没有办法让 Dataframe.map_partition 的进度条工作或延迟?我认为缺乏期货是导致酒吧不起作用的原因。如果我将代码更改client.submit
为进度条确实有效。
- 代码如下所示:
import dask.dataframe as dd
from distributed import Client
from distributed.diagnostics.progressbar import progress
client = Client("tcp://....")
...
ddf = dd.read_parquet("...")
ddf = ddf.map_partitions(..)
progress(ddf) # no futures to pass
dask.compute(ddf)
- dask.delayed 的替代方案也不起作用:
delayed = [dask.deplayed(myfunc)(ddf.get_partition(i)) for i in range(ddf.npartitions)]
progress(delayed)
dask.compute(*delayed)
- Client.submit 确实产生了一个工作进度条,但代码执行失败,我还没有设法调试它。
futures = [client.submit(myfunc, ddf.get_partition(i)) for i in range(ddf.npartitions)]
progress(futures)
dask.compute(*futures)
有没有办法让 map_partitions 或 dask.delayed 的进度条(或完成的任务与总数的报告)?
- 延迟的完整代码示例:
import dask
import npumpy as np
import pandas as pd
import dask.dataframe as dd
from distributed import Client
from distributed.diagnostics.progressbar import progress
import time
cl = Client("tcp://10.0.2.15:8786")
def wait(df):
print("Received chunk")
time.sleep(2)
print("finish")
df = pd.DataFrame(np.random.randint(0, 100, size=(1000, 4)), columns=list('ABCD'))
ddf = dd.from_pandas(df, npartitions=4)
futures = [dask.delayed(wait)(ddf.get_partition(i)) for i in range(ddf.npartitions)]
progress(futures)
dask.compute(*futures)
解决方案
是的,您是对的,progress
旨在使用期货或包含期货的集合。但是,您无需提交大量期货即可使用它:
ddf = ddf.map_partitions(..)
fut = client.compute(ddf)
progress(fut)
# wait on fut, call fut.result() or continue
也不要忘记:您正在使用的分布式调度程序,即使仅在一台机器上,也带有一个包含相同信息的诊断仪表板。通常这是 at http://localhost:8787
,您可以从任何浏览器访问。
推荐阅读
- c# - 具有 CosmosDb 绑定的 Azure 函数正确的本地设置
- python - 无法在客户端加载 JSON 类型的消息
- python - DeepAR 将时间序列数组转换为 JSON - (pandas, python)
- r - 如何加快R中的顺序分析?
- excel - 如何根据单元格中的条件删除行
- java - 如何查找在 Eclipse 中未评估函数返回值的所有实例?
- java - 如何创建指向 jar 文件的可执行链接?
- sql - 存储在 SSIS 之外的变量值
- arrays - 使用python 3获取json数组的键
- c# - 我无法使用基于 cUrl 的证书制作 HttpClient Post