首页 > 解决方案 > 如何基于另一个具有重复索引的数组获取 numpy 数组中的值的总和

问题描述

data_values = np.random.rand(10)
data_ind = np.random.randint(0,10,10)
    
data_values = (array([0.81444589, 0.57734696, 0.54130794, 0.22339518, 0.916973  ,
            0.14956333, 0.74504583, 0.36218693, 0.17958372, 0.47195214]),
    
data_ind = array([7, 5, 2, 2, 0, 6, 6, 1, 4, 3]))

期望的输出:

0 - 0.91693   
1 - 0.36218693  
2 - 0.54130794 + 0.22339518  
3 - 0.47195214  
4 - 0.17958372  
5 - 0.57734696  
6 -  0.14956333 + 0.74504583  
output = array([0.916973, 0.36218694, 0.7647031, 0.47195214, 0.17958371, 0.577347, 0.89460915, 0.8144459], dtype=float32)

我已经写了很长的路

nodal_values = np.zeros(8, dtype=np.float32)  
for nodes in range(8):  
    nodal_values[nodes] = np.sum(data_values[np.where(data == nodes)[0]])

上述方法需要很多时间,而

a = ((np.mgrid[:M,:N] == b)[0] * c).sum(axis=1)

为数百万的大数据提供内存错误。

我正在寻找一种优化的方式。

标签: pythonpython-3.xnumpy-ndarray

解决方案


请查看stackoverflow 问题指南,以便提出更好的问题并正确格式化它们。


选项

原始代码

这是您要针对较大值进行优化的内容N(我冒昧地编辑了您的代码,以便它没有硬编码的值并修复了错字,data_values而不是data):

data_values = np.random.rand(N) 
data_ind = np.random.randint(0, N, N)

xsize = data_ind.max() + 1
nodal_values = np.zeros(xsize, dtype=np.float32)  
for nodes in range(xsize):  
    nodal_values[nodes] = np.sum(data_values[np.where(data_ind == nodes)[0]])

稍微好一点的版本(为了可读性)

我创建了以下版本,它提高了可读性并取消了使用np.where

idx = np.arange(xsize)[:, None] == data_ind
nodal_values = [np.sum(data_values[idx[i]]) for i in range(xsize)] # Python list

更好的版本

我在这里通过@Divakar 对您的案例实施了接受的答案(一定要检查它以更好地理解它):

_, idx, _ = np.unique(data_ind, return_counts=True, return_inverse=True)
nodal_values = np.bincount(idx, data_values) # Same shape and type as your version

比较

使用您的原始值:

data_values = np.array([0.81444589, 0.57734696, 0.54130794, 0.22339518, 0.916973, 0.14956333, 0.74504583, 0.36218693, 0.17958372, 0.47195214])
data_ind = np.array([7, 5, 2, 2, 0, 6, 6, 1, 4, 3])

我使用timeit模块 ( mean ± std. dev. of 7 runs, 10000000 loops each) 获得了以下性能:

Original code: 49.2 +- 11.1 ns
Much better version: 45.2 +- 4.98 ns
Slightly better version: 36.4 +- 2.81 ns

对于非常小的值N,即 1 到 10,没有显着差异。但是,对于大的,使用哪一个是毫无疑问的。带有 for 循环的两个版本都花费了太长时间,而矢量化实现却非常快。

小N比较 大N比较

测试它的代码

import numpy as np
import timeit
import matplotlib.pyplot as plt

def original_code():
    xsize = data_ind.max() + 1
    nodal_values = np.zeros(xsize, dtype=np.float32)
    for nodes in range(xsize):
        nodal_values[nodes] = np.sum(data_values[np.where(data_ind == nodes)[0]])

def much_better():
    _, idx, _ = np.unique(data_ind, return_counts=True, return_inverse=True)
    nodal_values = np.bincount(idx, data_values)

def slightly_better():
    xsize = data_ind.max() + 1
    idx = np.arange(xsize)[:, None] == data_ind
    nodal_values = [np.sum(data_values[idx[i]]) for i in range(xsize)]

sizes = [i*5 for i in range(1, 7)]
original_code_times = np.zeros((len(sizes),))
slightly_better_times = np.zeros((len(sizes),))
much_better_times = np.zeros((len(sizes),))
for i, N in enumerate(sizes):
    print(N)
    data_values = np.random.rand(N)
    data_ind = np.random.randint(0, N, N)

    # Divided by 100 repeats to get average
    original_code_times[i] = timeit.timeit(original_code, number=100) / 100
    much_better_times[i] = timeit.timeit(much_better, number=100) / 100
    slightly_better_times[i] = timeit.timeit(slightly_better, number=100) / 100

# Multiply by 1000 to get everything in ms
original_code_times *= 1000
slightly_better_times *= 1000
much_better_times *= 1000

# %%
plt.figure(dpi=120)
plt.title("Small N's")
plt.plot(sizes, original_code_times, label="Original code")
plt.plot(sizes, slightly_better_times, label="Slightly better")
plt.plot(sizes, much_better_times, label="Much better")
plt.ylabel("Time [ms]")
plt.xlabel("N")
plt.xticks(sizes)
plt.legend()
plt.savefig("small_N.png", dpi=120)
plt.show()
plt.close()

我希望这可以帮助任何可能偶然发现这一点的人。


推荐阅读