python - 迭代看似相同的 dask 数组需要不同的时间
问题描述
我正在尝试使用 Dask 读取大小未知的混合文件(npy、csv 等)。在进行一些涉及切片的操作之前,这些文件将被转换为数组并合并为一个。
但是,我注意到根据数组的创建方式存在显着的速度差异。考虑以下两种从 csv 创建数组的方法:
- 使用 numpy.readtxt 和 dask.array.from_array,
- 使用 dask.dataframe.read_csv 和 dask.dataframe.to_dask_array。
现在,简单地迭代第一个数组比迭代第二个数组快近 1000 倍。我将假设这是因为数组是使用内存中已有的对象创建的。
据我了解,第一个数组由一个块组成,因此迭代它相对较快。但是,即使我将第二个数组重新分块以匹配第一个数组,迭代速度也不会显着提高。我还注意到 array.nbytes 属性为两个数组显示了相同的数字,这表明它们都完全存在于内存中。
我的期望是,一旦我开始遍历数组,Dask 会将必要的相关块读入内存。而且由于只有一个块适合内存(对于这种特殊情况),我希望速度是可比的,忽略一次将块读入内存的开销。请帮助我理解我在这里的推理中犯了什么错误。
下面是一个演示此行为的最小示例 [python 3.6.2,numpy 1.17.4,dask 2.9.0]:
import time
import numpy as np
import dask.array as da
import dask.dataframe as dd
def make_files():
np.random.random(0)
mat = np.random.random((6000, 784))
np.savetxt('data.csv', mat, delimiter=',', header=','.join(str(x) for x in range(784)))
def from_csv_via_np():
mat = np.loadtxt('data.csv', delimiter=',', skiprows=1)
arr = da.from_array(mat)
return arr
def from_csv_via_df():
df = dd.read_csv('data.csv')
arr = df.to_dask_array(lengths=True)
arr = da.rechunk(arr, (6000, 784))
return arr
def benchmark(fn):
arr = fn()
iter_start = time.perf_counter()
n_iters = 10
for i in range(n_iters):
x = arr[i].compute()
iter_elapsed = (time.perf_counter() - iter_start)/n_iters
print(f"func: {fn.__name__}")
print(f" array: {repr(arr)}")
print(f" read: {read_elapsed} seconds")
print(f" iter: {iter_elapsed} seconds")
print(f" size: {arr.nbytes} bytes")
if __name__ == "__main__":
make_files()
benchmark(from_csv_via_np)
benchmark(from_csv_via_df)
解决方案
我相信在更仔细地阅读了Dask 内部文档之后,我自己已经找到了答案。
数组内部并不相同,我在查看底层任务图后意识到。可以使用以下命令绘制它们:
arr.visualize('image.pdf')
这是一个 numpy 支持的数组的图表,这是一个数据帧支持的数组的图表,两者都访问数组的第一个元素。
如您所见,第一个相当简单,它起源于单个块,原来是存储在内存中的原始 numpy 数组本身。这也可以通过打印图形字典来验证,可在以下位置访问:
arr.__dask_graph__().layers
相比之下,第二张图相对复杂,它源自两个不同的 read_csv 任务。由于 Dask 默认不保留中间结果,因此可以安全地假设每次调用 .compute() 都会发生这些计算。在某些情况下,缓存可能有助于解决类似问题。
所以简短的回答是,源自数据帧的数组需要重复的 I/O 调用,这使得它比直接从 RAM 读取的 numpy 慢得多。
推荐阅读
- java - Java NIO Socketchannel 只在关闭输出后发送数据。为什么是这样?
- terraform - 在 Terraform 中管理模块版本
- python - 在 Python Cloud Function 中使用错误处理程序返回 JSON
- r - 在 Windows 10 (x64) 下无法读取 R 中的 microsoft access 数据库
- excel - 使用 .bat 文件和任务调度程序执行 .vbs 文件
- sql - 已经有一个名为##ObjectName 的对象
- python - 当我们处理大长度的数字时如何减少python中的时间限制
- scala - S3AbortableInputStream:并非所有字节都从 S3ObjectInputStream 中读取
- r - 使用两个列表中的公共信息创建矩阵
- python - 根据条件替换熊猫数据框列中的数据,如果条件不满足则跳过