python - 使用 python 多进程的 get() 函数偶尔出现超时问题
问题描述
我对 python 比较陌生,但我有一个我无法理解的问题。
我正在尝试对计算 mandelbrot 分形的一段代码实现多处理。我的问题是,它似乎可以正常工作并且不能随机工作,我不明白为什么。
我已将问题缩小到具体的 .get() 函数,我正在运行该函数以从“ApplyResult”多处理对象中获取数据。这个函数似乎会超时并永远运行,或者直到它被终止。有时代码会运行一次或两次,然后停止工作,这向我暗示了某种正在使用但未释放的端口,但我尝试添加到代码中以解决此问题的所有内容似乎都没有帮助。
我尝试删除“if name = main”部分,这似乎没有什么区别,我尝试了在多处理文档中可以找到的其他函数来释放端口,包括 .join() .kill() 、.terminate( ),以及完全不使用 close 的尝试。我的代码的相关部分如下所示
import numpy as np
import multiprocessing as mp
import os
I = 100
T = 2
N = 100
C_rerow = np.reshape(np.linspace(-2, 1, N), (1, N))
C_imrow = np.reshape(np.linspace(1.5, -1.5, N), (N, 1))
C = C_rerow + C_imrow*1j
def mandel(C_row):
Mandelbrot = np.ones_like(np.absolute(C_row))
Z_old = np.zeros_like(C[:,0])
for i in range(I):
Z_new = Z_old**2 + C_row
Z_old = Z_new
Bool = (np.absolute(Z_new) > T) & (Mandelbrot == 1)
Mandelbrot[Bool] = i/I
return Mandelbrot
def MultiP(C, I, T):
pool = mp.Pool(processes=os.cpu_count() - 1)
Mc = []
for j in range(N):
C_row = C[j,:]
test = pool.apply_async(mandel, (C_row,))
Mc.append(test)
pool.close()
return Mc
if __name__ == '__main__':
Mc = MultiP(C, I, T)
Mc = [Mc[i].get(timeout = 5) for i in range(len(Mc))]
另一个我无法理解的奇怪问题是,如果我将 MultiP 的输出重命名为与函数返回的不同的名称(例如,将 Mc = MultiP() 重命名为 MC = MultiP(),那么返回的值将不会得救了。我以前从未见过函数发生过这样的事情,我不确定这些问题是否相关,但我想我会提到它。
任何输入将不胜感激。
解决方案
这对于评论来说有点长,也许不是一个答案。底线是我无法重现您的错误,但我确实有一些评论。
创建新进程的代码需要if __name__ == '__main__':
在那些使用操作系统的平台上由测试有条件地执行,spawn
而不是fork
创建新进程的方法。在这种情况下,新进程不会像创建子进程时那样继承主进程地址空间的副本,而是从程序顶部开始执行,重新执行在全局范围内找到的所有内容。如果不是为了if __name__ == '__main__':
测试,它会递归地尝试在无限循环中创建更多的子流程。因此,最好不要在全局范围内完成实际上不需要的复杂计算,因为它们将由多处理池中的每个子进程重新执行。最好将这些计算移到if __name__ == '__main__':
堵塞。如果您将要调用的工作函数需要访问这些值,您可以将 then 作为函数参数传递(如果这些是大量数据,这可能会很昂贵),或者在每个子进程的地址空间使用全局变量初始化一次,如在下面完成。
我还尝试重新创建您在将返回值从MultiP
from重命名为Mc
to时引用的问题,MC
但没有问题。我还纠正了您的缩进错误。
当您执行pool.close()
后面的序列时pool.join()
,您将阻塞,直到所有提交的任务完成。因此,如果您已经提交了异步任务apply_async
并且不需要从AsyncResult
创建的返回实例中获取任何返回值,您可以使用close
并join
确保任务已完成执行。如果您get
在返回的AsyncResult
实例上使用方法,则还可以保证任务已完成(或在您的情况下超时),在这种情况下,实际上不需要发出close
and join
。顺便说一句,只是因为您TimeoutError
在调用时遇到了异常get
表示任务已超时,它实际上仍在运行。大概您不想等待超时任务完成。因此,您应该调用pool.terminate()
以终止任何正在运行的任务(这在with Pool() as poll:
块终止时隐式调用。
请注意我添加的评论。
给你的修辞问题(它们不需要回答,但应该考虑):
你有全局变量
C
,I
,T
,N
. 函数MultiP
接受和作为参数C
,忽略全局变量但访问全局变量。访问它需要的所有东西作为全局变量。这不矛盾吗?I
T
N
mandel
MultiP
包含重复调用的逻辑mandel
。它可以在有或没有多处理的情况下执行此操作。对于多处理,它可以使用apply_async
或者可能更有效的map
方法(如果所有参数都放入一个列表中,并且如果您使用了合适的chunksize参数,结果大于 1)。然而,您返回给调用者的不是最终结果,而是AsyncResult
实例列表。这意味着调用者依赖于MulitiP
.MultiP
既然调用者无论如何都会立即“获取”结果,那么仅仅拥有“获取”并返回结果并减少调用者和被调用者之间的耦合不是更明智吗?
import numpy as np
import multiprocessing as mp
import os
N = 100
I = 100
T = 2
def init_pool(c):
global C
C = c
def mandel(C_row):
Mandelbrot = np.ones_like(np.absolute(C_row))
Z_old = np.zeros_like(C[:,0])
for i in range(I):
Z_new = Z_old**2 + C_row
Z_old = Z_new
Bool = (np.absolute(Z_new) > T) & (Mandelbrot == 1)
Mandelbrot[Bool] = i/I
return Mandelbrot
def MultiP(C, I, T):
# initialize each sub-process's global C variable:
pool = mp.Pool(processes=os.cpu_count() - 1, initializer=init_pool, initargs=(C,))
Mc = []
for j in range(N):
C_row = C[j,:]
test = pool.apply_async(mandel, (C_row,))
Mc.append(test)
# previous 5 statements can be replaced with:
# Mc = [pool.apply_async(mandel, (C[j,:],)) for j in range(N)]
#pool.close() # not required
return Mc
if __name__ == '__main__':
# moved here so the calculations are done once:
C_rerow = np.reshape(np.linspace(-2, 1, N), (1, N))
C_imrow = np.reshape(np.linspace(1.5, -1.5, N), (N, 1))
C = C_rerow + C_imrow*1j
MC = MultiP(C, I, T)
# The following can throw a TimeoutError exception:
MC = [MC[i].get(timeout = 5) for i in range(len(MC))]
print(MC)
印刷:
test.py:17: RuntimeWarning: overflow encountered in square
Z_new = Z_old**2 + C_row
test.py:17: RuntimeWarning: invalid value encountered in square
Z_new = Z_old**2 + C_row
test.py:17: RuntimeWarning: overflow encountered in square
Z_new = Z_old**2 + C_row
test.py:17: RuntimeWarning: invalid value encountered in square
Z_new = Z_old**2 + C_row
test.py:17: RuntimeWarning: overflow encountered in square
Z_new = Z_old**2 + C_row
test.py:17: RuntimeWarning: overflow encountered in square
Z_new = Z_old**2 + C_row
test.py:17: RuntimeWarning: invalid value encountered in square
Z_new = Z_old**2 + C_row
test.py:17: RuntimeWarning: overflow encountered in square
Z_new = Z_old**2 + C_row
test.py:17: RuntimeWarning: invalid value encountered in square
Z_new = Z_old**2 + C_row
test.py:17: RuntimeWarning: overflow encountered in square
Z_new = Z_old**2 + C_row
test.py:17: RuntimeWarning: invalid value encountered in square
Z_new = Z_old**2 + C_row
test.py:17: RuntimeWarning: overflow encountered in square
Z_new = Z_old**2 + C_row
test.py:19: RuntimeWarning: overflow encountered in absolute
Bool = (np.absolute(Z_new) > T) & (Mandelbrot == 1)
test.py:17: RuntimeWarning: invalid value encountered in square
Z_new = Z_old**2 + C_row
test.py:17: RuntimeWarning: invalid value encountered in square
Z_new = Z_old**2 + C_row
test.py:19: RuntimeWarning: overflow encountered in absolute
Bool = (np.absolute(Z_new) > T) & (Mandelbrot == 1)
[array([1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1.,
1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1.,
1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1.,
1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1.,
1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1.,
1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1.]), array([1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1.,
1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1.,
1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1.,
1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1.,
1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1.,
1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1.]), array([1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1.,
1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1.,
etc.
推荐阅读
- pyspark - 如何在pyspark中找到变异系数(CV)?
- sql - 如何使用 sql DB2 将列转置为行
- ios - iOS Photos App标签栏图标如何从系统中获取该图标
- html - 进入下拉菜单时 HTML 代码中的问题
- regex - 如何从 Google 表格中的字符串中删除一组数字
- hyperledger-fabric-sdk-go - Hyperledger Fabric 服务发现 - 如何获取对等 tls 证书?
- kubernetes - 找不到 Kubernetes 清单的规格
- javascript - 对话框未关闭反应本机
- c# - 在使用 Photon Pun2 的多人游戏中遇到问题(信息游戏玩家)
- json - 如何使用 jq 添加动态生成的数组?