首页 > 解决方案 > numpy 数组的并行处理问题

问题描述

我在尝试加快程序计算时遇到了问题。在我的代码的序列化 python 版本中,我正在计算一个函数 f(x) 的值,它返回一个浮点数,用于 NumPy 数组的滑动窗口,如下所示:

a = np.array([i for i in range(1, 10000000)]) # Some data here
N = 100
result = []
for i in range(N, len(a)):
    result.append(f(a[i - N:i]))

由于 NumPy 数组非常大并且 f(x) 运行时间很高,因此我尝试应用多处理来加速我的代码。通过我的研究,我发现charm4py 可能是一个很好的解决方案,它具有池功能,可以将数组分解成块并在生成的进程之间分配工作。我已经实现了 charm4py 的多处理示例,然后将其转换为我的案例:

# Split an array into subarrays for sequential processing (takes only 5 seconds)
a = np.array([a[i - N:i] for i in range(N, len(a))])
result = charm.pool.map(f, a, chunksize=512, ncores=-1)
# I'm running this code through "charmrun +p18 example.py"

我遇到的问题是,尽管在更强大的实例(18 个物理核心与 6 个物理核心)上执行代码,但它的运行速度要慢得多。

我预计会看到约 3 倍的改进,但它没有发生。在寻找解决方案时,我了解到昂贵的反序列化/启动新进程会产生一些开销,但我不确定是否是这种情况。

我非常感谢任何关于如何实现 NumPy 数组的快速并行处理的反馈或建议(假设函数 f(x) 未矢量化,需要很长时间来计算,并且在内部产生大量特定/个体无法并行化的调用)?

谢谢!

标签: pythonnumpypython-multiprocessingraycharm++

解决方案


听起来您正在尝试将此操作与 Charm 或 Ray 并行化(尚不清楚如何将两者一起使用)。

如果您选择使用 Ray,并且您的数据是一个 numpy 数组,则可以利用零拷贝读取来避免任何反序列化开销。

您可能想稍微优化一下滑动窗口功能,但它可能看起来像这样:

@ray.remote
def apply_rolling(f, arr, start, end, window_size):
    results_arr = []
    for i in range(start, end - window_size):
        results_arr.append(f(arr[i : i + windows_size])
    return np.array(results_arr)

请注意,这种结构允许我们f在单个任务中多次调用(也称为批处理)。

要使用我们的功能:

# Some small setup
big_arr = np.arange(10000000)
big_arr_ref = ray.put(big_arr)

batch_size = len(big_arr) // ray.available_resources()["CPU"]
window_size = 100

# Kick off our tasks
result_refs = []
for i in range(0, big_arr, batch_size):
    end_point = min(i + batch_size, len(big_arr))
    ref = apply_rolling.remote(f, big_arr_ref, i, end_point)
    result_refs.append(ref)


# Handle the results
flattened = []
for section in ray.get(result_refs):
    flattened.extend(section)

我确定您会想要自定义此代码,但这里有 2 个您可能想要维护的重要且不错的属性。

批处理:我们使用批处理来避免启动太多任务。在任何系统中,并行化都会产生开销,因此我们总是要小心并确保我们不会启动太多任务。此外,我们正在计算batch_size = len(big_arr) // ray.available_resources()["CPU"]以确保我们使用与 CPU 完全相同的批次数。

共享内存:由于 Ray 的对象存储支持从 numpy 数组进行零拷贝读取,因此从 numpy 数组调用ray.get或读取几乎是免费的(在没有网络成本的单台机器上)。虽然序列化/调用有一些开销ray.put,所以这种方法只调用put(昂贵的操作)一次,并且ray.get(隐式调用)多次。

提示:将数组作为参数直接传递给远程函数时要小心。即使您传递同一个对象,它也会ray.put多次调用。


推荐阅读