首页 > 解决方案 > numpy np.where 扫描数组一次以获取对应于 T/F 条件的两组索引

问题描述

我有一个数组,我需要获得满足条件为真和相同条件为假的索引,例如:

x = np.random.rand(100000000)
true_inds = np.where(x < 0.5)
false_inds = np.where(x >= 0.5)

在我的用例x中非常大,这段代码在循环中被调用,并且分析显示这np.where实际上是瓶颈。我目前正在做类似于上述代码的事情,它不必要地扫描x两次以获得两组索引。是否有可能在不从头开始实施专门替换的情况下true_indsfalse_inds只需一次扫描即可获得两者?xnp.where

标签: pythonperformancenumpy

解决方案


对于大约 1500 以上的操作数大小,拆分稳定的结果argsort似乎是一个很好的解决方案(可能快两倍,尽管对于非常大的大小,它似乎变得更少)。

![在此处输入图像描述

如果你已经pythran安装了一个更一致的加速可以得到(numba应该是类似的)。

笔记:

  1. 使用稳定排序很重要,即kind="stable",当返回的索引无序时,省略的性能变得更糟

  2. 我怀疑这需要一个相当新的numpy版本,因为他们只是添加了新的和类型特定的排序算法。

  3. 一些解决方案以任意顺序绘制了回报指数,但如果有的话,获得的加速相当小

pythran生成情节的代码,必要时注释掉相关内容:

from simple_benchmark import BenchmarkBuilder, MultiArgument
import numpy as np
from scipy.sparse import csr_matrix
from biwhere_pthr import biwhere as use_pythran, \
    biwhere_halfordered as use_pythran_half, \
    biwhere_unordered as use_pythran_un

B = BenchmarkBuilder()

@B.add_function(alias="nonzero")
def use_nonzero(mask):
    return mask.nonzero()[0], (~mask).nonzero()[0]

@B.add_function(alias="argpartition")
def use_partition(mask):
    nf = mask.size - np.count_nonzero(mask)
    ft = mask.argpartition(nf)
    return ft[nf:],ft[:nf]

@B.add_function(alias="argsort")
def use_sort(mask):
    nf = mask.size - np.count_nonzero(mask)
    ft = mask.argsort()
    return ft[nf:],ft[:nf]

@B.add_function(alias="argsort stable")
def use_stable_sort(mask):
    nf = mask.size - np.count_nonzero(mask)
    ft = mask.argsort(kind="stable")
    return ft[nf:],ft[:nf]

@B.add_function(alias="sparse")
def use_sparse(mask):
    aux = csr_matrix((mask,mask,np.arange(mask.size+1)),(mask.size,2)).tocsc()
    return aux.indices[aux.indptr[1]:],aux.indices[:aux.indptr[1]]

B.add_function(alias="pythran")(use_pythran)
B.add_function(alias="pythran up down")(use_pythran_half)
B.add_function(alias="pythran unordered")(use_pythran_un)

@B.add_arguments('array size')
def argument_provider():
    for exp in range(8, 27):
        sz = int(2**exp)
        yield sz,np.random.randint(0,2,sz,dtype=bool)

# checks
for sz,mask in argument_provider():
    ref = use_nonzero(mask)
    for f in use_stable_sort,use_sparse,use_pythran:
        if not all(map(np.array_equal,f(mask),ref)):
            print(sz,f.__name__)
    for f in (use_partition,use_sort,use_pythran_un):
        if not all(map(np.array_equal,map(np.sort,f(mask)),ref)):
            print(sz,f.__name__)
    ht,hf = use_pythran_half(mask)
    if not all(map(np.array_equal,(ht[::-1],hf),ref)):
        print(sz,"use_pythran_half")

r = B.run()
r.plot(relative_to=use_nonzero)

import pylab
pylab.savefig('biwhere.png')

pythran使用 `pythran -O3 编译模块:

import numpy as np

#pythran export biwhere(bool[:])
#pythran export biwhere_halfordered(bool[:])
#pythran export biwhere_unordered(bool[:])

def biwhere(mask):
    nt = np.count_nonzero(mask)
    f,t = np.empty(mask.size-nt,int),np.empty(nt,int)
    i = 0
    j = 0
    for k,m in enumerate(mask):
        if m:
            t[j] = k
            j += 1
        else:
            f[i] = k
            i += 1
    return t,f

def biwhere_halfordered(mask):
    ft = np.empty(mask.size,int)
    i = 0
    j = mask.size-1
    for k,m in enumerate(mask):
        if m:
            ft[j] = k
            j -= 1
        else:
            ft[i] = k
            i += 1
    return ft[i:],ft[:i]

def biwhere_unordered(mask):
    ft = np.empty(mask.size,int)
    i = 0
    j = mask.size-1
    while i<=j:
        if not mask[i]:
            ft[i] = i
            i += 1
        elif mask[j]:
            ft[j] = j
            j -= 1
        else:
            ft[i] = j
            ft[j] = i
            i += 1
            j -= 1
    return ft[i:],ft[:i]

推荐阅读