python - 使用 numpy 向量化循环
问题描述
我正在处理图像文件。需要帮助以矢量化形式转换以下循环
for i in range(height):
for j in range(width):
print(j,i)
start1[i, j, 0] = -0.5 + j / (width - 1)
start1[i, j, 1] = (-0.5 + i / (height - 1)) * height / width
start1[i, j, 2] = 0
解决方案
下面我提供了两种解决方案,一种是使用 just numpy
,另一种是使用numpy
+ numba
,两者都可以使用python -m pip install numpy numba
.
import numpy as np
# -------- Version 1, vectorized ----------
height, width = 2, 4
start1 = np.zeros((height, width, 3), dtype = np.float64)
start1[:, :, 0] = -0.5 + np.arange(width)[None, :] / (width - 1)
start1[:, :, 1] = (-0.5 + np.arange(height)[:, None] / (height - 1)) * height / width
print(start1)
# -------- Version 2, vectorized ----------
import numba
@numba.njit(cache = True)
def compute_start1(height, width):
start1 = np.zeros((height, width, 3), dtype = np.float64)
for i in range(height):
for j in range(width):
start1[i, j, 0] = -0.5 + j / (width - 1)
start1[i, j, 1] = (-0.5 + i / (height - 1)) * height / width
start1[i, j, 2] = 0
return start1
print(compute_start1(2, 4))
输出:
[[[-0.5 -0.25 0. ]
[-0.16666667 -0.25 0. ]
[ 0.16666667 -0.25 0. ]
[ 0.5 -0.25 0. ]]
[[-0.5 0.25 0. ]
[-0.16666667 0.25 0. ]
[ 0.16666667 0.25 0. ]
[ 0.5 0.25 0. ]]]
[[[-0.5 -0.25 0. ]
[-0.16666667 -0.25 0. ]
[ 0.16666667 -0.25 0. ]
[ 0.5 -0.25 0. ]]
[[-0.5 0.25 0. ]
[-0.16666667 0.25 0. ]
[ 0.16666667 0.25 0. ]
[ 0.5 0.25 0. ]]]
我提供了 Numba 解决方案,因为它是一个不错的包,它允许您仅使用常规 python 代码编写非常快速的代码。甚至无需考虑如何将代码实现为 numpy 函数。几乎任何非常简单的代码都可以通过 numba 进行50-200x
多次增强。
Numba是一个即时编译器,可将 python 代码转换为 C++,然后转换为快速机器代码,这将原始代码100x
平均提高了大约倍!它与使用 NumPy 一样快,有时甚至更快。它也与 numpy 密切相关,它支持内部的所有 numpy 函数,甚至可以通过为函数装饰器提供parallel = True
参数来并行化它们,请参阅此 jit 文档以供参考。njit
在大多数情况下,为了对100x
代码进行矢量化和提升时间,您只需要@numba.njit
在函数之前添加一行就可以了。当然,Numba 可以编译为纯快速 C++ 而不是任何代码,但大多数涉及大量循环/条件/等的非常简单的算法以及数字和/或 numpy 操作都可以由 Numba 编译和增强。
接下来是所有三种解决方案(一个非矢量化和两个矢量化)的时间测量代码。需要安装一次性模块python -m pip install numpy numba timerit
。
import numpy as np
# -------- Version 0, non-vectorized ----------
def f0(height, width):
start1 = np.zeros((height, width, 3), dtype = np.float32)
for i in range(height):
for j in range(width):
start1[i, j, 0] = -0.5 + j / (width - 1)
start1[i, j, 1] = (-0.5 + i / (height - 1)) * height / width
return start1
# -------- Version 1, vectorized ----------
def f1(height, width):
start1 = np.zeros((height, width, 3), dtype = np.float32)
start1[:, :, 0] = -0.5 + np.arange(width)[None, :] / (width - 1)
start1[:, :, 1] = (-0.5 + np.arange(height)[:, None] / (height - 1)) * height / width
return start1
# -------- Version 2, vectorized ----------
import numba
@numba.njit(cache = True, fastmath = True)
def f2(height, width):
start1 = np.zeros((height, width, 3), dtype = np.float32)
for i in range(height):
for j in range(width):
start1[i, j, 0] = -0.5 + j / (width - 1)
start1[i, j, 1] = (-0.5 + i / (height - 1)) * height / width
return start1
# -------- Time measuring ----------
from timerit import Timerit
Timerit._default_asciimode = True
h, w = 256, 512
ra, rt = None, None
for f in [f0, f1, f2]:
print(f'{f.__name__}: ', end = '', flush = True)
tim = Timerit(num = 15, verbose = 1)
for t in tim:
a = f(h, w)
if ra is None:
ra, rt = a, tim.mean()
else:
t = tim.mean()
assert np.allclose(a, ra)
print(f'speedup {round(rt / t, 3)}x')
输出:
f0: Timed best=159.324 ms, mean=159.855 +- 0.4 ms
f1: Timed best=1.212 ms, mean=1.257 +- 0.0 ms
speedup 127.178x
f2: Timed best=1.294 ms, mean=1.310 +- 0.0 ms
speedup 122.065x
推荐阅读
- docker - 标记 docker 多阶段构建的所有阶段
- r - 在 bar/col 图中堆叠多列 df 并创建 %
- differential-equations - 我如何区分这个复杂的功能?(等温球的泊松方程)?
- numpy - 如何将 PyTorch 张量转换为 Numpy 的 ndarray?
- google-apps-script - 在创建新电子表格时安装基于时间的触发器
- sql-server - 通过 Visio 中的 Powershell 脚本自动刷新 SQL Server 导入的数据集
- r - 将数据文件渲染为 R 中的格式化 word 文档
- python - 比较python“wordwise”中的2个字符串并提取它们的差异
- azure - Azure 工作簿条形图不会像 azure 分析日志查询那样按第三列值拆分数据
- office-js - Office.context.mailbox.item -> 没有数据,outlook 桌面客户端为空。在OWA工作