首页 > 解决方案 > conditional vectorized calculation with numpy arrays without using direct masking

问题描述

following up on another question

import numpy as np

repeat=int(1e5)
r_base = np.linspace(0,4,5)
a_base = 2
np.random.seed(0)
r_mat = r_base * np.random.uniform(0.9,1.1,(repeat,5))

a_array = a_base * np.random.uniform(0.9,1.1, repeat)


# original slow approach
def func_vetorized_level1(r_row, a):
    if r_row.mean()>2:
        result = np.where((r_row >= a), r_row - a, np.nan)
    else:
        result = np.where((r_row >= a), r_row + a, 0)
    return result
# try to broadcast this func to every row of r_mat using list comprehension
def func_list_level2(r_mat, a_array):
    res_mat = np.array([func_vetorized_level1(this_r_row, this_a) 
                        for this_r_row, this_a in zip(r_mat, a_array)])
    return res_mat

# faster with direct masking, but with unnecessary more calculation
def f_faster(r_mat,a_array):
    a = a_array[:, None]  # to column vector

    row_mask = (r_mat.mean(axis=1) > 2)[:,None]
    elem_mask = r_mat >= a

    out = np.empty_like(r_mat)

    out[row_mask & elem_mask] = (r_mat - a)[row_mask & elem_mask]
    out[~row_mask & elem_mask] = (r_mat + a)[~row_mask & elem_mask]
    out[row_mask & ~elem_mask] = np.nan
    out[~row_mask & ~elem_mask] = 0
    
    return out

# fastest with ufunc in numpy as suggested by @mad_physicist
def f_fastest(r_mat,a_array):
    a = a_array[:, None]  # to column vector

    row_mask = (r_mat.mean(axis=1) > 2)[:,None]
    elem_mask = r_mat >= a

    out = np.empty_like(r_mat)


    np.subtract(r_mat, a, out=out, where=row_mask & elem_mask)
    np.add(r_mat, a, out=out, where=~row_mask & elem_mask)
    out[row_mask & ~elem_mask] = np.nan
    out[~row_mask & ~elem_mask] = 0
    
    return out

I would like to ask if it is possible to have a user-defined func that can be used, or take advantage of the fastest approach? I thought about using indexing but found it is challenging, because the sliced elements using [row_ind, co_ind] is a 1d array of the selected elements. I see the sliced matrix can be put to a matrix using reshape, but is there an elegant way to do it? Ideally this r_mat + a operation can be replace by a user-defined function.

标签: pythonnumpyconditional-statementsvectorization

解决方案


You absolutely can have a vectorized solution with a user defined function, as long as that function it is vectorized to work element-wise on a 1D array (which should be the case for anything written using numpy functions out of the box).

Let's say you have r_mat as an (m, n) matrix and a_array as an (m,) vector. You can write your function to accept hooks. Each hook can be a constant or a callable. If it is a callable, it gets called with two arrays of the same length, and must return a third array of the same length. You can change that contract to include indices or whatever you want at will:

def f(r_mat, a_array, hook11, hook01, hook10, hook00):
    a = a_array[:, None]  # to column vector

    row_mask = (r_mat.mean(axis=1) > 2)[:,None]
    elem_mask = r_mat >= a

    out = np.empty_like(r_mat)

    def apply_hook(mask, hook):
        r, c = np.nonzero(mask)
        out[r, c] = hook(r_mat[r, c], a_array[r]) if callable(hook) else hook

    apply_hook(row_mask & elem_mask, hook11)
    apply_hook(~row_mask & elem_mask, hook01)
    apply_hook(row_mask & ~elem_mask, hook10)
    apply_hook(~row_mask & ~elem_mask, hook00)

    return out

The current configuration in your code would be called like

f(r_mat, a_array, np.subtract, np.add, np.nan, 0)

Let's say you wanted to do something more complex than np.subtract. You could do for example:

def my_complicated_func(r, a):
    return np.cumsum(r, a) - 3 * r // a + np.exp(a)

f(r_mat, a_array, my_complicated_func, np.add, np.nan, 0.0)

The key is that my_complicated_func operates on arrays. It will be passed a subset of the elements of r_mat and the elements of a_array duplicated as many times as necessary along each row.

You could also do the same thing with the function being aware of the index of each location. Just call hook as hook(r_mat[r, c], a_array[r], r, c). Now the hook functions must accept two additional arguments. The original code would be equivalent to

f(r_mat, a_array, lambda r, a, *args: np.subtract(r, a), lambda r, a, *args: np.add(r, a), np.nan, 0)

推荐阅读