首页 > 解决方案 > Dask 诊断 - 带有 map_partition / 延迟的进度条

问题描述

我正在使用分布式调度程序和分布式进度条。有没有办法让 Dataframe.map_partition 的进度条工作或延迟?我认为缺乏期货是导致酒吧不起作用的原因。如果我将代码更改client.submit为进度条确实有效。

import dask.dataframe as dd
from distributed import Client
from distributed.diagnostics.progressbar import progress

client = Client("tcp://....")
...
ddf = dd.read_parquet("...")
ddf = ddf.map_partitions(..)
progress(ddf)  # no futures to pass
dask.compute(ddf)
delayed = [dask.deplayed(myfunc)(ddf.get_partition(i)) for i in range(ddf.npartitions)]
progress(delayed)
dask.compute(*delayed)
futures = [client.submit(myfunc, ddf.get_partition(i)) for i in range(ddf.npartitions)]
progress(futures)
dask.compute(*futures)

有没有办法让 map_partitions 或 dask.delayed 的进度条(或完成的任务与总数的报告)?

import dask
import npumpy as np
import pandas as pd
import dask.dataframe as dd
from distributed import Client
from distributed.diagnostics.progressbar import progress
import time

cl = Client("tcp://10.0.2.15:8786")

def wait(df):
    print("Received chunk")
    time.sleep(2)
    print("finish")

df = pd.DataFrame(np.random.randint(0, 100, size=(1000, 4)), columns=list('ABCD'))
ddf = dd.from_pandas(df, npartitions=4)

futures = [dask.delayed(wait)(ddf.get_partition(i)) for i in range(ddf.npartitions)]
progress(futures)
dask.compute(*futures)

标签: pythonpandasnumpydaskdask-distributed

解决方案


是的,您是对的,progress旨在使用期货或包含期货的集合。但是,您无需提交大量期货即可使用它:

ddf = ddf.map_partitions(..)
fut = client.compute(ddf)
progress(fut)
# wait on fut, call fut.result() or continue

也不要忘记:您正在使用的分布式调度程序,即使仅在一台机器上,也带有一个包含相同信息的诊断仪表板。通常这是 at http://localhost:8787,您可以从任何浏览器访问。


推荐阅读