python - 共享numpy数组时的python多处理
问题描述
我想通过利用多处理来部分更改大型 numpy 数组中的值。
也就是说,我想最后得到[[100, 100, 100], [100, 100, 100]]。
但是下面的代码是错误的,它说“RuntimeError:SynchronizedArray 对象只能通过继承在进程之间共享”
我应该怎么办?谢谢。
import numpy as np
import multiprocessing
from multiprocessing import RawArray, Array
def change_array(array, i, j):
X_np = np.frombuffer(array.get_obj(), dtype=np.float64).reshape(2, 3)
X_np[i, j] = 100
print(np.frombuffer(array.get_obj()))
if __name__ == '__main__':
X_shape = (2, 3)
data = np.array([[1.1, 2.2, 3.3], [4.4, 5.5, 6.6]])
X = Array('d', X_shape[0] * X_shape[1])
# Wrap X as an numpy array so we can easily manipulates its data.
X_np = np.frombuffer(X.get_obj()).reshape(X_shape)
# Copy data to our shared array.
np.copyto(X_np, data)
pool = multiprocessing.Pool(processes=3)
result = []
for i in range(2):
for j in range(3):
result.append(pool.apply_async(change_array, (X, i, j,)))
result = [r.get() for r in result]
pool.close()
pool.join()
print(np.frombuffer(X.get_obj()).reshape(2, 3))
解决方案
You need to make two changes:
- Use a
multiprocessing.Array
instance with locking (actually, the default) rather than a "plain"Array
. - Do not pass the array instance as an argument to your worker function. Instead you should initialize each processor in your pool with the array as a global value.
import numpy as np
import multiprocessing
from multiprocessing import RawArray, Array
def initpool(arr):
global array
array = arr
def change_array(i, j):
X_np = np.frombuffer(array.get_obj(), dtype=np.float64).reshape(2, 3)
X_np[i, j] = 100
print(np.frombuffer(array.get_obj()))
if __name__ == '__main__':
X_shape = (2, 3)
data = np.array([[1.1, 2.2, 3.3], [4.4, 5.5, 6.6]])
X = multiprocessing.Array('d', X_shape[0] * X_shape[1], lock=True)
# Wrap X as an numpy array so we can easily manipulates its data.
X_np = np.frombuffer(X.get_obj()).reshape(X_shape)
# Copy data to our shared array.
np.copyto(X_np, data)
pool = multiprocessing.Pool(processes=3, initializer=initpool, initargs=(X,))
result = []
for i in range(2):
for j in range(3):
result.append(pool.apply_async(change_array, (i, j,)))
result = [r.get() for r in result]
pool.close()
pool.join()
print(np.frombuffer(X.get_obj()).reshape(2, 3))
Prints:
[100. 2.2 3.3 4.4 5.5 6.6]
[100. 100. 3.3 4.4 5.5 6.6]
[100. 100. 100. 4.4 5.5 6.6]
[100. 100. 100. 100. 5.5 6.6]
[100. 100. 100. 100. 100. 6.6]
[100. 100. 100. 100. 100. 100.]
[[100. 100. 100.]
[100. 100. 100.]]
Update
Since in this case the values being changed in the data
array do not depend on the existing values in that array, there is no need for function change_array
to have access to the array and it can instead, as suggested by Frank Yellin, just return a tuple of the indices to be changed with the new value. But I did want to show you how you would pass the array for those situations where the function did need to access/modify the array. The following code, in this instance, however, is all that you need (I have made a few simplifications):
import numpy as np
import multiprocessing
def change_array(i, j):
return i, j, 100
if __name__ == '__main__':
data = np.array([[1.1, 2.2, 3.3], [4.4, 5.5, 6.6]])
with multiprocessing.Pool(processes=3) as pool:
result = [pool.apply_async(change_array, (i, j)) for i in range(2) for j in range(3)]
for r in result:
i, j, value = r.get()
data[i, j] = value
print(data)
Or:
import numpy as np
import multiprocessing
import itertools
def change_array(t):
i, j = t
return i, j, 100
if __name__ == '__main__':
data = np.array([[1.1, 2.2, 3.3], [4.4, 5.5, 6.6]])
with multiprocessing.Pool(processes=3) as pool:
for i, j, value in pool.map(change_array, itertools.product(range(2), range(3))):
data[i, j] = value
print(data)
推荐阅读
- java - springboot Thymeleaf 添加不同的列表字段
- docker - 有没有办法创建 Docker 卷并用数据预填充它?
- python - 如何从 Pandas (Python) 中由排序时间序列索引的数据帧中的列中的所有值生成统计特征?
- java - Java:Android UI(Android Studio)和java后端(模型)的交互/连接(与JavaFX相比)
- auth0 - 根据用户的电子邮件地址自动选择 Auth0 DB Connection
- c - 为什么代码只打印指向字符串的指针数组中最后一个指针的值?
- swift - 如何在不使用 Swift 中的新变量的情况下进行向下转换?
- python - 将 multiprocessing.Value 对象传递给 ctype 函数?
- django - 如何解决:AttributeError: 'QuerySet' 对象没有属性 'name'
- docker - Docker for windows 在尝试查看设置时挂起