首页 > 解决方案 > Numpy ravel 在对 ndarray 进行轻微更改后需要很长时间

问题描述

我正在处理一个展平图像(1920x1080x4),我需要在其中重塑(例如arr.reshape((1920,1080,4))),删除最后一个通道(例如arr[:,:,:3]),从 BGR 转换为 RGB(例如arr[:,:,::-1]),最后再次展平(例如arr.ravel())。问题在于 ravel/flatten/reshape(-1) 操作,这增加了大约 20ms 的计算时间。

为了便于调试,我假设传入的数组是一个扁平的 1920x1080x3 图像,这意味着我只需要担心 BGR 到 RGB 的转换和扁平化。但是,在测试 reshape+ravel、reshape+BGR2RGB 和最后 reshape+BGR2RGB+ravel 时,结果分别为 1ms、1ms、20ms,这对我来说没有任何意义,因为它只是一些值在内存中改变位置. ravel 是否有任何理由创建数组的副本?我怎样才能减少这个时间?

注意:我还测试了文档注释上写的就地重塑方法numpy.reshape,但是,如指定的那样,引发了错误,这意味着需要先复制数组才能重塑。

Bellow是我用于测试的代码:

import numpy as np
from time import time

arr_original = np.ones((1920*1080*3), dtype=np.uint8)

arr = arr_original.copy()
s = time()
arr = arr.reshape(1920,1080,3)
arr = arr.ravel()
print(f"Reshape + ravel: {round(1000*(time()-s),2)}ms")

arr = arr_original.copy()
s = time()
arr = arr.reshape(1920,1080,3)
arr = arr[:,:,::-1]
print(f"Reshape + BGR2RGB: {round(1000*(time()-s),2)}ms")

arr = arr_original.copy()
s = time()
arr = arr.reshape(1920,1080,3)
arr = arr[:,:,::-1]
arr = arr.ravel()
print(f"Reshape + BGR2RGB + ravel: {round(1000*(time()-s),2)}ms")

在我的机器上输出

Reshape + ravel: 0.01ms
Reshape + BGR2RGB: 0.01ms
Reshape + BGR2RGB + ravel: 20.54ms

标签: pythonarraysnumpymemoryflatten

解决方案


这是因为您上面的所有操作都在为相同的数据生成视图,但是需要最后一个 ravel 来制作副本。

numpy 数组中的数组具有底层内存,形状和步幅确定每个元素的位置。

可以通过简单地改变形状和步幅来重新塑造一个连续的数组,而无需修改数据。这里的切片也是如此。但是由于您的最后一个数组不是连续的,因此当您使用 ravel 时,它将复制所有内容。

例如,在一个 3d 数组中,访问元素arr[i,j,k]意味着访问内存,base + i * arr.strides[0] + j * arr.strides[1] + k * arr.strides[1]你可以用它做很多事情(如果你在给定的轴上使用 stride 0,即使是广播)。

arr_original = np.ones((1920*1080*4), dtype=np.uint8)
arr = arr_original
print(arr.shape, arr.strides)
arr = arr.reshape(1920,1080,4)
print(arr.shape, arr.strides)
arr = arr[:,:,:3] # keep strides only reduces the length of the last axis
print(arr.shape, arr.strides)
arr = arr[:,:,::-1] # change strides of last axis to -1
print(arr.shape, arr.strides)
arr[0,0,:] = [3,4,5] # operations here are using the memory allocated
arr[0,1,:] = [6,7,8] # for arr_original
arr = arr.ravel()
arr[:] = 0 # this won't affect the original because the data was copied
print(arr_original[:8])

改进您的解决方案

这是您必须在库代码中进行试验或深入研究的情况。我更喜欢测试编写代码的不同方式。

原始方法通常是最好的方法,但在这种特定情况下,我们拥有的是未对齐的内存,因为您正在写入步幅为 3 的 uint8。

在判断性能时,重要的是要知道什么是合理的预期,在这种情况下,我们可以将格式转换与纯副本进行比较

arr = arr_original.copy()

每个循环 1.89 毫秒 ± 43.1 微秒(平均值 ± 标准偏差。7 次运行,每次 100 次循环)

arr = arr_original
arr = arr.reshape(1920,1080,4)
arr = arr[:,:,:3] 
arr = arr[:,:,::-1]
arr[0,0,:] = [3,4,5] 
arr[0,1,:] = [6,7,8] 
arr = arr.ravel()

每个循环 12.3 毫秒 ± 101 微秒(平均值 ± 标准偏差。7 次运行,每次 100 次循环)(大约比副本慢 6 倍)

arr = arr_original
arr = arr.reshape(1920,1080,4)
arr_aux = np.empty(arr.shape[:-1] + (3,), dtype=np.uint8)
arr_aux[:,:,0] = arr[:,:,2]
arr_aux[:,:,1] = arr[:,:,1]
arr_aux[:,:,2] = arr[:,:,0]
arr = arr_aux.ravel()

每个循环 4.16 毫秒 ± 25 微秒(平均值 ± 标准偏差。7 次运行,每次 100 次循环)(大约比副本慢 2 倍)

分析

在第一种情况下,最后一个轴的尺寸也非常小,所以这可能会导致一个小循环。让我们看看如何将此操作投影到 C++

for(int i = 0; i < height; ++i){
  for(int j = 0; j < width; ++j){
    // this part would be the bottleneck
    for(int k = 0; k < 3; ++k){
      dst[(width * i + j)*3 + k] = src[(width * i + j)*4 + k];
    }
  }
}

当然 numpy 做的事情比这更多,并且可以通过将独立于循环变量的部分预计算移动到循环外来更有效地计算索引。这里的想法是说教的。

让我们计算执行的分支数,每个 for 循环将执行 N+1 个分支,进行 N 次迭代(N 个进入循环,最后一个跳转中断它)。所以上面的代码运行1 + height * (1 + 1 + width * (1 + 3)) ~ 4 * width * height分支。

如果我们展开最里面的循环为

for(int i = 0; i < height; ++i){
  for(int j = 0; j < width; ++j){
    // this part would be the bottleneck
    dst[(width * i + j)*3 + 0] = src[(width * i + j)*4 + 0];
    dst[(width * i + j)*3 + 1] = src[(width * i + j)*4 + 1];
    dst[(width * i + j)*3 + 2] = src[(width * i + j)*4 + 2];
  }
}

分支数变为1 + height * (1 + 1 + width) ~ height * width, 减少 4 倍。我们无法在 python 中执行此操作,因为我们无法访问内部循环。但是使用第二个代码,我们实现了类似

for(int i = 0; i < height; ++i){
  for(int j = 0; j < width; ++j){
    // this part would be the bottleneck
    dst[(width * i + j)*3 + 0] = src[(width * i + j)*4 + 0];
  }
}

for(int i = 0; i < height; ++i){
  for(int j = 0; j < width; ++j){
    dst[(width * i + j)*3 + 1] = src[(width * i + j)*4 + 1];
  }
}

for(int i = 0; i < height; ++i){
  for(int j = 0; j < width; ++j){
    dst[(width * i + j)*3 + 2] = src[(width * i + j)*4 + 2];
  }
}

那仍然会比第一个分支更少。

通过观察到的改进,我想最后一个循环必须调用诸如memcpy之类的函数或其他具有更多开销的函数,以尝试更快地处理更大的切片,可能会检查内存对齐,这将失败,因为我们使用的是步幅为 3 的字节。


推荐阅读