python - python类中的多处理
问题描述
我正在尝试进行并行计算以加快 for 循环(我已经在使用 itertools,我需要更快的速度,因为我多次执行 for 循环)。我是多处理的新手。我已经检查了几个关于堆栈溢出的问题,我试图解决我的问题,但是我仍然遇到了一些困难。我正在创建共享变量(self.A、self.B、self.C),以便有效地完成多处理。但是,我认为我做错了,因为当我在计算后检查我的变量时,我发现它们没有改变。我的代码有点复杂,所以下面的代码是演示我的问题的示例代码。谢谢你的帮助!
import numpy as np
from multiprocessing import Process, Array, Pool
from ctypes import c_double
import itertools
class F():
def __init__(self, num_process=4):
self.num_process = num_process
self.idx = list(itertools.product(range(5), range(10)))
self.A = np.zeros((5, 10))
if self.num_process > 1:
self.A = np.frombuffer(Array(c_double, self.A.flat, lock=False))
self.A.resize(5,10)
def solve(self):
self.B = np.zeros((10, 5, 10))
self.C = np.zeros((10, 5, 10))
if self.num_process > 1:
self.B = np.frombuffer(Array(c_double, self.B.flat, lock=False))
self.B.resize(10,5,10)
self.C = np.frombuffer(Array(c_double, self.C.flat, lock=False))
self.C.resize(10,5,10)
print('Before=',self.A,self.B,self.C)
for i in range(10):
if self.num_process == 1:
for (k,l) in self.idx:
self.B[i,k,l]=1
self.C[i,k,l]=1
else:
workers = []
for worker_num in range(self.num_process):
worker = Process(target=F.update,
args=(i, worker_num, self.num_process,
self.idx, self.A, self.B, self.C))
workers.append(worker)
worker.start()
for worker in workers:
worker.join()
print('After=',self.A,self.B,self.C)
@staticmethod
def update( i, worker_num, num_process, idx, A, B, C):
start_num = int(len(idx) * (worker_num/num_process))
end_num = int(len(idx) * ((worker_num+1)/num_process))
for j in range(start_num, end_num):
k,l = idx[j]
B[i,k,l]=min(2,A[k,l])
C[i,k,l]=2
if __name__ == '__main__':
var=F()
var.solve()
当我在计算后打印我的变量时,我发现它们没有改变。
更新
我能够更正我的代码并使用以下代码进行多处理。正如 Ricky Kim 所指出的,我的错误是我没有创建共享变量。下面的代码实现了这一点,但它仍然比使用 1 个进程慢得多(当然要执行相同的操作)。关于如何使多处理更快、更有效的任何想法。谢谢!
import numpy as np
import multiprocessing as mp
from multiprocessing import Process, Array, Pool
from ctypes import c_double
import itertools
class F():
def __init__(self, num_process=4):
self.num_process = num_process
self.idx = list(itertools.product(range(5), range(10)))
self.A = np.zeros((5, 10))
def solve(self):
B_shared = Array(c_double, 10*5*10)
C_shared = Array(c_double, 10*5*10)
self.B = np.frombuffer(B_shared.get_obj())
self.B = self.B.reshape(10,5,10)
self.C = np.frombuffer(C_shared.get_obj())
self.C = self.C.reshape(10,5,10)
print('Before=',self.A,self.B,self.C)
for i in range(10):
if self.num_process == 1:
# perform some expensive operation
for (k,l) in self.idx:
self.B[i,k,l]=1
self.C[i,k,l]=1
else:
workers = []
for worker_num in range(self.num_process):
worker = Process(target=self.update,
args=(i, worker_num, self.num_process, B_shared, C_shared))
workers.append(worker)
worker.start()
for worker in workers:
worker.join()
print('After=',self.A,self.B,self.C)
def update(self, i, worker_num, num_process, B_shared, C_shared):
B = np.frombuffer(B_shared.get_obj())
B = B.reshape((10,5,10))
C = np.frombuffer(B_shared.get_obj())
C = C.reshape((10,5,10))
start_num = int(len(self.idx) * (worker_num/num_process))
end_num = int(len(self.idx) * ((worker_num+1)/num_process))
for j in range(start_num, end_num):
# perform some expensive operation
k,l = self.idx[j]
B[i,k,l]=min(2,self.A[k,l])
C[i,k,l]=2
if __name__ == '__main__':
mp.freeze_support()
var=F()
var.solve()
解决方案
推荐阅读
- azure - 无法从 Azure 函数应用访问私钥证书
- python - 在 def init PyTorch 中获取 CNN 层输出大小
- angular - 如何在iphone,safari中使用角度实现长按(点击并按住)
- java - 在读取整数或字符串时停止扫描仪
- java - 如何在 Java 中返回子字符串的第二个实例?
- android - Android soong 没有 android.PathForSourceRelaxed 和 ctx.Config().BuildOSTarget
- reactjs - 自定义动画反应导航底部选项卡导航器
- game-engine - 如何在 Bevy 中操作渲染图
- python - 二叉搜索树最大高度
- c# - 数组中的元素(从字典值复制时)被反转