首页 > 解决方案 > 使用 python 多进程的 get() 函数偶尔出现超时问题

问题描述

我对 python 比较陌生,但我有一个我无法理解的问题。

我正在尝试对计算 mandelbrot 分形的一段代码实现多处理。我的问题是,它似乎可以正常工作并且不能随机工作,我不明白为什么。

我已将问题缩小到具体的 .get() 函数,我正在运行该函数以从“ApplyResult”多处理对象中获取数据。这个函数似乎会超时并永远运行,或者直到它被终止。有时代码会运行一次或两次,然后停止工作,这向我暗示了某种正在使用但未释放的端口,但我尝试添加到代码中以解决此问题的所有内容似乎都没有帮助。

我尝试删除“if name = main”部分,这似乎没有什么区别,我尝试了在多处理文档中可以找到的其他函数来释放端口,包括 .join() .kill() 、.terminate( ),以及完全不使用 close 的尝试。我的代码的相关部分如下所示

import numpy as np
import multiprocessing as mp
import os
I = 100
T = 2
N = 100
C_rerow = np.reshape(np.linspace(-2, 1, N), (1, N))
C_imrow = np.reshape(np.linspace(1.5, -1.5, N), (N, 1))
C = C_rerow + C_imrow*1j 

def mandel(C_row):
    Mandelbrot = np.ones_like(np.absolute(C_row))
    Z_old = np.zeros_like(C[:,0])
    for i in range(I):
        Z_new = Z_old**2 + C_row
        Z_old = Z_new
        Bool = (np.absolute(Z_new) > T) & (Mandelbrot == 1)
    Mandelbrot[Bool] = i/I
return Mandelbrot

def MultiP(C, I, T):
    pool = mp.Pool(processes=os.cpu_count() - 1)
    Mc = []
    for j in range(N):
        C_row = C[j,:]
        test = pool.apply_async(mandel, (C_row,))
        Mc.append(test)
    pool.close()
return Mc
if __name__ == '__main__':
    Mc = MultiP(C, I, T)
    Mc = [Mc[i].get(timeout = 5) for i in range(len(Mc))]

另一个我无法理解的奇怪问题是,如果我将 MultiP 的输出重命名为与函数返回的不同的名称(例如,将 Mc = MultiP() 重命名为 MC = MultiP(),那么返回的值将不会得救了。我以前从未见过函数发生过这样的事情,我不确定这些问题是否相关,但我想我会提到它。

任何输入将不胜感激。

标签: pythonmultiprocessing

解决方案


这对于评论来说有点长,也许不是一个答案。底线是我无法重现您的错误,但我确实有一些评论。

创建新进程的代码需要if __name__ == '__main__':在那些使用操作系统的平台上由测试有条件地执行,spawn而不是fork创建新进程的方法。在这种情况下,新进程不会像创建子进程时那样继承主进程地址空间的副本,而是从程序顶部开始执行,重新执行在全局范围内找到的所有内容。如果不是为了if __name__ == '__main__':测试,它会递归地尝试在无限循环中创建更多的子流程。因此,最好不要在全局范围内完成实际上不需要的复杂计算,因为它们将由多处理池中的每个子进程重新执行。最好将这些计算移到if __name__ == '__main__':堵塞。如果您将要调用的工作函数需要访问这些值,您可以将 then 作为函数参数传递(如果这些是大量数据,这可能会很昂贵),或者在每个子进程的地址空间使用全局变量初始化一次,如在下面完成。

我还尝试重新创建您在将返回值从MultiPfrom重命名为Mcto时引用的问题,MC但没有问题。我还纠正了您的缩进错误。

当您执行pool.close()后面的序列时pool.join(),您将阻塞,直到所有提交的任务完成。因此,如果您已经提交了异步任务apply_async并且不需要从AsyncResult创建的返回实例中获取任何返回值,您可以使用closejoin确保任务已完成执行。如果您get在返回的AsyncResult实例上使用方法,则还可以保证任务已完成(或在您的情况下超时),在这种情况下,实际上不需要发出closeand join。顺便说一句,只是因为您TimeoutError在调用时遇到了异常get表示任务已超时,它实际上仍在运行。大概您不想等待超时任务完成。因此,您应该调用pool.terminate()以终止任何正在运行的任务(这在with Pool() as poll:块终止时隐式调用。

请注意我添加的评论。

给你的修辞问题(它们不需要回答,但应该考虑):

  1. 你有全局变量C, I, T, N. 函数MultiP接受和作为参数C,忽略全局变量但访问全局变量。访问它需要的所有东西作为全局变量。这不矛盾吗?ITNmandel

  2. MultiP包含重复调用的逻辑mandel。它可以在有或没有多处理的情况下执行此操作。对于多处理,它可以使用apply_async或者可能更有效的map方法(如果所有参数都放入一个列表中,并且如果您使用了合适的chunksize参数,结果大于 1)。然而,您返回给调用者的不是最终结果,而是AsyncResult实例列表。这意味着调用者依赖于MulitiP. MultiP既然调用者无论如何都会立即“获取”结果,那么仅仅拥有“获取”并返回结果并减少调用者和被调用者之间的耦合不是更明智吗?

import numpy as np
import multiprocessing as mp
import os

N = 100
I = 100
T = 2

def init_pool(c):
    global C
    C = c

def mandel(C_row):
    Mandelbrot = np.ones_like(np.absolute(C_row))
    Z_old = np.zeros_like(C[:,0])
    for i in range(I):
        Z_new = Z_old**2 + C_row
        Z_old = Z_new
        Bool = (np.absolute(Z_new) > T) & (Mandelbrot == 1)
    Mandelbrot[Bool] = i/I
    return Mandelbrot

def MultiP(C, I, T):
    # initialize each sub-process's global C variable:
    pool = mp.Pool(processes=os.cpu_count() - 1, initializer=init_pool, initargs=(C,))
    Mc = []
    for j in range(N):
        C_row = C[j,:]
        test = pool.apply_async(mandel, (C_row,))
        Mc.append(test)
    # previous 5 statements can be replaced with:
    # Mc = [pool.apply_async(mandel, (C[j,:],)) for j in range(N)]
    #pool.close() # not required
    return Mc

if __name__ == '__main__':
    # moved here so the calculations are done once:
    C_rerow = np.reshape(np.linspace(-2, 1, N), (1, N))
    C_imrow = np.reshape(np.linspace(1.5, -1.5, N), (N, 1))
    C = C_rerow + C_imrow*1j
    MC = MultiP(C, I, T)
    # The following can throw a TimeoutError exception:
    MC = [MC[i].get(timeout = 5) for i in range(len(MC))]
    print(MC)

印刷:

test.py:17: RuntimeWarning: overflow encountered in square
  Z_new = Z_old**2 + C_row
test.py:17: RuntimeWarning: invalid value encountered in square
  Z_new = Z_old**2 + C_row
test.py:17: RuntimeWarning: overflow encountered in square
  Z_new = Z_old**2 + C_row
test.py:17: RuntimeWarning: invalid value encountered in square
  Z_new = Z_old**2 + C_row
test.py:17: RuntimeWarning: overflow encountered in square
  Z_new = Z_old**2 + C_row
test.py:17: RuntimeWarning: overflow encountered in square
  Z_new = Z_old**2 + C_row
test.py:17: RuntimeWarning: invalid value encountered in square
  Z_new = Z_old**2 + C_row
test.py:17: RuntimeWarning: overflow encountered in square
  Z_new = Z_old**2 + C_row
test.py:17: RuntimeWarning: invalid value encountered in square
  Z_new = Z_old**2 + C_row
test.py:17: RuntimeWarning: overflow encountered in square
  Z_new = Z_old**2 + C_row
test.py:17: RuntimeWarning: invalid value encountered in square
  Z_new = Z_old**2 + C_row
test.py:17: RuntimeWarning: overflow encountered in square
  Z_new = Z_old**2 + C_row
test.py:19: RuntimeWarning: overflow encountered in absolute
  Bool = (np.absolute(Z_new) > T) & (Mandelbrot == 1)
test.py:17: RuntimeWarning: invalid value encountered in square
  Z_new = Z_old**2 + C_row
test.py:17: RuntimeWarning: invalid value encountered in square
  Z_new = Z_old**2 + C_row
test.py:19: RuntimeWarning: overflow encountered in absolute
  Bool = (np.absolute(Z_new) > T) & (Mandelbrot == 1)
[array([1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1.,
       1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1.,
       1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1.,
       1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1.,
       1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1.,
       1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1.]), array([1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1.,
       1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1.,
       1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1.,
       1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1.,
       1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1.,
       1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1.]), array([1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1.,
       1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1.,

etc.

推荐阅读