首页 > 解决方案 > 迭代看似相同的 dask 数组需要不同的时间

问题描述

我正在尝试使用 Dask 读取大小未知的混合文件(npy、csv 等)。在进行一些涉及切片的操作之前,这些文件将被转换为数组并合并为一个。

但是,我注意到根据数组的创建方式存在显着的速度差异。考虑以下两种从 csv 创建数组的方法:

现在,简单地迭代第一个数组比迭代第二个数组快近 1000 倍。我将假设这是因为数组是使用内存中已有的对象创建的。

据我了解,第一个数组由一个块组成,因此迭代它相对较快。但是,即使我将第二个数组重新分块以匹配第一个数组,迭代速度也不会显着提高。我还注意到 array.nbytes 属性为两个数组显示了相同的数字,这表明它们都完全存在于内存中。

我的期望是,一旦我开始遍历数组,Dask 会将必要的相关块读入内存。而且由于只有一个块适合内存(对于这种特殊情况),我希望速度是可比的,忽略一次将块读入内存的开销。请帮助我理解我在这里的推理中犯了什么错误。

下面是一个演示此行为的最小示例 [python 3.6.2,numpy 1.17.4,dask 2.9.0]:

import time                                                                                          
import numpy as np                                                                                   
import dask.array as da                                                                              
import dask.dataframe as dd                                                                          

def make_files():                                                                                    
    np.random.random(0)                                                                              
    mat = np.random.random((6000, 784))                                                                
    np.savetxt('data.csv', mat, delimiter=',', header=','.join(str(x) for x in range(784)))          

def from_csv_via_np():                                                                               
    mat = np.loadtxt('data.csv', delimiter=',', skiprows=1)                                          
    arr = da.from_array(mat)                                                                         
    return arr                                                                                       

def from_csv_via_df():                                                                               
    df = dd.read_csv('data.csv')                                                                     
    arr = df.to_dask_array(lengths=True)                                                             
    arr = da.rechunk(arr, (6000, 784))                                                               
    return arr                                                                                       

def benchmark(fn):                                                                                   
    arr = fn()                                                                                       

    iter_start = time.perf_counter()                                                                 
    n_iters = 10                                                                                     
    for i in range(n_iters):                                                                         
        x = arr[i].compute()                                                                         

    iter_elapsed = (time.perf_counter() - iter_start)/n_iters                                        

    print(f"func: {fn.__name__}")                                                                    
    print(f"    array: {repr(arr)}")                                                                 
    print(f"    read: {read_elapsed} seconds")                                                       
    print(f"    iter: {iter_elapsed} seconds")                                                       
    print(f"    size: {arr.nbytes} bytes")                                                           

if __name__ == "__main__":                                                                           
    make_files()                                                                                     
    benchmark(from_csv_via_np)                                                                       
    benchmark(from_csv_via_df)                                                                       

标签: pythondask

解决方案


我相信在更仔细地阅读了Dask 内部文档之后,我自己已经找到了答案。

数组内部并不相同,我在查看底层任务图后意识到。可以使用以下命令绘制它们:

arr.visualize('image.pdf')

这是一个 numpy 支持的数组的图表,这是一个数据帧支持的数组的图表,两者都访问数组的第一个元素。

如您所见,第一个相当简单,它起源于单个块,原来是存储在内存中的原始 numpy 数组本身。这也可以通过打印图形字典来验证,可在以下位置访问:

arr.__dask_graph__().layers

相比之下,第二张图相对复杂,它源自两个不同的 read_csv 任务。由于 Dask 默认不保留中间结果,因此可以安全地假设每次调用 .compute() 都会发生这些计算。在某些情况下,缓存可能有助于解决类似问题。

所以简短的回答是,源自数据帧的数组需要重复的 I/O 调用,这使得它比直接从 RAM 读取的 numpy 慢得多。


推荐阅读