首页 > 解决方案 > 单个队列上的多处理

问题描述

我对 Python 中的多处理哲学感到有些困惑。为了测试我的知识,我想到了一个计算整数的素数分解的多处理程序。它如下。将整数放入队列中。然后,我有一个函数可以出列并搜索它的(主要)除数。如果找到一个,则将互补整数放回队列中。我怎样才能使这项工作。目前我有这个:

import multiprocessing as mp

def f(queue, decomp):
    x = queue.get()
    prime = True
    for i in range(2, x):
        if (x % i) == 0:
            decomp.put(i)
            prime = False
            queue.put(x // i)
            break
    if prime:
        decomp.put(x)

class Num:
    def __init__(self, n):
        self.queue = mp.Queue()
        self.queue.put(n)
        self.decomposition = mp.Queue()

    def run(self):
        with mp.Pool(4) as pool:
            pool.apply_async(f, (self.queue, self.decomposition))

它提出了

RuntimeError: Queue objects should only be shared between processes through inheritance

制作这个的标准方法是什么?(我知道可能有更好的方法来进行素数分解)

标签: pythonmultiprocessing

解决方案


为了使用multiprocessing.Queue,您需要将它作为创建它们的点传递给每个子进程(因此它们被“继承”),而不是将它们作为参数传递给apply_async. 如果你在 Linux 上,你可以通过在全局范围内声明它们来做到这一点,而不是作为Num类上的实例变量——它们将通过分叉过程被继承:

import multiprocessing as mp

queue = mp.Queue()
decomposition = mp.Queue()

def f():
    x = queue.get()
    prime = True
    for i in range(2, x):
        if (x % i) == 0:
            decomposition.put(i)
            prime = False
            queue.put(x // i)
            break
    if prime:
        decomposition.put(x)
    
class Num:
    def __init__(self, n):
        queue.put(n)

    def run(self):
        with mp.Pool(4) as pool:
            pool.apply(f)

在 Windows 上,它有点复杂,因为它不支持分叉。相反,您必须在构造函数中使用initandinitargs关键字参数Pool将队列传递给子进程,然后在您提供的初始化函数中将它们声明为全局变量。这会将队列放在工作进程的全局范围内,允许您在传递给所有Pool方法(map/ map_asyncapply/ apply_async)的函数中使用它们。

import multiprocessing as mp

def f():
    x = queue.get()
    prime = True
    for i in range(2, x):
        if (x % i) == 0:
            decomp.put(i)
            prime = False
            queue.put(x // i)
            break
    if prime:
        decomp.put(x)


def init(q, d):
    # Put the queues in the global scope of the worker processes
    global queue, decomp
    queue = q
    decomp = d        

class Num:
    def __init__(self, n):
        self.queue = mp.Queue()
        self.queue.put(n)
        self.decomposition = mp.Queue()

    def run(self):
        with mp.Pool(4, initializer=init, initargs=(self.queue, self.decomposition)) as pool:
            pool.apply(f)

推荐阅读