首页 > 解决方案 > 在类上注册一个方法并在多处理中使用它不起作用

问题描述

假设我有一个lib.py带有基类的模块A和一个register允许用户向该类添加其他方法的函数

例如 lib.py

class A:
    def test1(self):
        return 1

def register(fn, cls):
    setattr(cls, fn.__name__, fn)


def parallel1(a):
    return a.test1()

def parallel2(a):
    return a.test2()

现在main.py我添加了一个简单的方法,叫做test2

main.py

from lib import A, register

def test2(self):
    return 2

register(test2, A)

a = A()
print(a.test2())  # it works, it prints 2

问题是当我将它与多处理一起使用时,寄存器功能不起作用。我怀疑这是因为当作业被发送到子进程时,类 A 是从 lib.py 重新导入的,它还没有测试方法。

from lib import parallel1, parallel2
from concurrent.futures import ProcessPoolExecutor


with ProcessPoolExecutor() as executor:
    result = [*executor.map(parallel1, (A() for i in range(10)))]  # this works 

with ProcessPoolExecutor() as executor:
    result = [*executor.map(parallel2, (A() for i in range(10)))]  # this does NOT work

关于如何解决它的任何想法?PS:我知道我可以在并行函数中将新函数注册到类中,但是有一些原因我不能这样做非常感谢

标签: pythonmultiprocessing

解决方案


您应该使用您正在运行的平台标记您的问题,作为与您要求的多处理问题相关的问题的指南。

解决方案是确保池中的进程执行初始化池所需的注册;

from workers import A, register, parallel1, parallel2

def test2(self):
    return 2

def init_pool():
    register(test2, A)

#required for Windows:
if __name__ == '__main__':

    from concurrent.futures import ProcessPoolExecutor

    # initialize each process in the pool using function init_pool:
    with ProcessPoolExecutor(initializer=init_pool) as executor:
        result1 = [*executor.map(parallel1, (A() for i in range(10)))]
        print(result1)
        result2 = [*executor.map(parallel2, (A() for i in range(10)))]
        print(result2)

印刷:

[1, 1, 1, 1, 1, 1, 1, 1, 1, 1]
[2, 2, 2, 2, 2, 2, 2, 2, 2, 2]

笔记

你也可以侥幸逃脱:

from workers import A, register, parallel1, parallel2

def test2(self):
    return 2

register(test2, A)

if __name__ == '__main__':

    from concurrent.futures import ProcessPoolExecutor

    with ProcessPoolExecutor() as executor:
        result1 = [*executor.map(parallel1, (A() for i in range(10)))]
        print(result1)
        result2 = [*executor.map(parallel2, (A() for i in range(10)))]
        print(result2)

例如,在 Windows 上,生成的每个新进程都将从程序顶部开始执行,并执行包括register(test2, A)语句在内的全局范围内的任何内容。但是明确地将其作为池初始化程序运行,使得该语句对池中进程的初始化的重要性更加清晰,并且它也不需要由主进程执行。


推荐阅读