假设我们将一个数字拆分为不同的域:例如:100 拆分为:[0, 25] [25, 50] [50, 75] [75, 100]。然后我们将这 4 个列表中的每一个发送到 4 个单独的过程中的一个进行计算,然后将答案重新组合为数字 100 的单个拆分单元。我们根据需要连续多次迭代这个过程' 以将 1000 个数字作为一个单位工作,这些数字被分成类似于 [0, 25] [25, 50] [50, 75] [75, 100] 的单独域。如果我们必须关闭进程以使它们充当为答案而处理的单个组单元,则会出现效率问题。由于 windows 在运行进程方面与 Unix 相比是垃圾,我们被迫使用“spawn”方法而不是 fork。spawn 方法在生成过程中很慢'所以我想为什么不保留这个过程' 打开并从它们传递数据,而无需为并行过程的每个迭代组打开和关闭它们。下面的示例代码将执行此操作。它将保持进程'作为类消费者打开,这些消费者将不断地使用 run()(在 while 循环中)请求带有 .get() 可加入队列的 next_task:

import multiprocessing

class Consumer(multiprocessing.Process):

    def __init__(self, task_queue, result_queue):
        self.task_queue = task_queue
        self.result_queue = result_queue

    def run(self):
        while True:
            next_task = self.task_queue.get()
            if next_task is None:
                # Poison pill shutdown of .get() loop with break
            answer = next_task()

class Task(object):
    def __init__(self, a, b):
        self.a = a
        self.b = b

    def __call__(self):
        for i in range(self.b):
            if self.a % i == 0:
                return 0
        return 1

if __name__ == '__main__':
    # Establish communication queues
    tasks = multiprocessing.JoinableQueue()
    results = multiprocessing.Queue()

    # Number of consumers equal to system cpu_count
    num_consumers = multiprocessing.cpu_count() 
    # Make a list of Consumer object process' ready to be opened.
    consumers = [ Consumer(tasks, results) for i in range(num_consumers) ]

    for w in consumers:

    # Enqueue jobs for the Class Consumer process' run() while-loop to .get() a workload:
    num_jobs = 10
    for i in range(num_jobs):
        tasks.put(Task(i, 100)) # Similar jobs would be reiterated before poison pill.

    # We start to .get() the results in a different loop-
    for _ in range(num_jobs):  # -so the above loop enqueues all jobs without- 
        result = results.get() # -waiting for the previous .put() to .get() first.
    # Add a poison pill for each consumer
    for i in range(num_consumers): # We only do this when all computation is done.
        tasks.put(None) # Here we break all loops of open Consumer enqueue-able process'.

此代码只是一个示例。在此代码的其他变体中:当实现 tasks.put() 和 results.get() 的更多迭代时,需要一种方法来使入队的 Task(object) 在完全计算答案之前通过外部调用返回并自行返回。如果您已经从该单个拆分号码组的“其他进程”之一获得答案,这将释放资源。描述__call__符需要存在,Task(object) 才能作为调用 tasks.put(Task(i, 100)) 的函数工作。在过去的两周里,我一直在试图找出一种有效的方法来做到这一点。我需要采取完全不同的方法吗?不要误解我的困境,我正在使用有效的代码,但没有我在 Microsoft Windslows 上想要的效率。任何帮助将不胜感激。

Task(object) 与将它入队的 Consumer() 进程不存在于同一进程中吗?如果是这样,我不能告诉 Class Consumer() Run() 的所有进程停止他们当前正在运行的 Task(object) 而不关闭他们的 while 循环(使用毒丸),这样他们就可以立即接受另一个 Task() 而无需是否需要再次关闭并重新打开他们的流程?当您为迭代计算打开和关闭数千个进程时,它确实会增加并浪费时间。我曾尝试使用 Events() Managers() 其他 Queues()。似乎没有一种有效的方法可以立即从外部干预任务(对象)return到它的父 Consumer() 以便它不会继续浪费资源计算如果其他 Consumers() 之一返回的答案使其他 Consumer() 任务的计算无关紧要,因为它们都作为单个数字的统一计算工作分成小组。

from multiprocessing import Pool
import time

def worker1(x):
    time.sleep(3) # emulate working on the problem
    return 9 # the solution

def worker2(x):
    time.sleep(1) # emulate working on the problem
    return 9 # the solution

def callback(answer):
    global solution
    # gets all the returned results from submitted tasks
    # since we are just interested in the first returned result, write it to the queue:
    solution = answer
    pool.terminate() # kill all tasks

if __name__ == '__main__':
    t = time.time()
    pool = Pool(2) # just two processes in the pool for demo purposes
    # submit two tasks:
    pool.apply_async(worker1, args=(1,), callback=callback)
    pool.apply_async(worker2, args=(2,), callback=callback)
    # wait for all tasks to terminate:
    print('Total elapsed time:', time.time() - t)


Total elapsed time: 1.1378364562988281


from multiprocessing import Pool
from queue import Queue
import time

def worker1(x):
    time.sleep(3) # emulate working on the problem
    return 9 # the solution

def worker2(x):
    time.sleep(1) # emulate working on the problem
    return 9 # the solution

def callback(solution):
    # gets all the returned results from submitted tasks
    # since we are just interested in the first returned result, write it to the queue:

if __name__ == '__main__':
    t = time.time()
    q = Queue()
    pool = Pool(2) # just two processes in the pool for demo purposes
    # submit two tasks:
    pool.apply_async(worker1, args=(1,), callback=callback)
    pool.apply_async(worker2, args=(2,), callback=callback)
    # wait for first returned result from callback:
    solution = q.get()
    pool.terminate() # kill all tasks in the pool
    print('Total elapsed time:', time.time() - t)


Total elapsed time: 1.1355643272399902


即使在 Windows 下,与任务完成所需的时间相比,创建和重新创建池的时间也可能相对微不足道,尤其是对于以后的迭代,即较大的n. 如果您正在调用相同的工作函数,那么第三种方法是使用 pool method imap_unordered。我还包括一些代码来衡量我的桌面启动新池实例的开销是多少:

from multiprocessing import Pool
import time

def worker(x):
    time.sleep(x) # emulate working on the problem
    return 9 # the solution

if __name__ == '__main__':
    # POOLSIZE = multiprocessing.cpu_count()
    POOLSIZE = 8 # on my desktop
    # how long does it take to start a pool of size 8?
    t1 = time.time()
    for i in range(16):
        pool = Pool(POOLSIZE)
    t2 = time.time()
    print('Average pool creation time: ', (t2 - t1) / 16)

    # POOLSIZE number of calls:
    arguments = [7, 6, 1, 3, 4, 2, 9, 6]
    pool = Pool(POOLSIZE)
    t1 = time.time()
    results = pool.imap_unordered(worker, arguments)
    it = iter(results)
    first_solution = next(it)
    t2 = time.time()
    print('Total elapsed time:', t2 - t1)


Average pool creation time:  0.053139880299568176
Total elapsed time: 1.169790506362915

更新 2


我只能想到另一种可能性,我在下面使用您的多处理方法介绍了这种可能性。一个名为的多处理共享内存变量stop被初始化为每个进程作为一个全局变量,并在每次迭代之前设置为 0。当一个任务被设置为返回值 0 并且在其他进程中运行的其他任务没有任何意义时,它会将值设置stop为 1。这意味着任务必须定期检查 的值,stop如果已设置则返回到 1。当然,这会为处理增加额外的周期。在下面的演示中,我实际上有 100 个任务排队等待 8 个处理器。但是最后 92 个任务将立即发现stop已设置并应在第一次迭代时返回。

顺便说一句:原始代码使用一个multiprocessing.JoinableQueue实例来对任务进行排队,而不是在该实例上进行multiprocessing.Queue调用,task_done因为消息已从队列中取出。然而,join从来没有在这个队列上进行过调用(它会告诉你什么时候所有的消息都被删除了),从而违背了拥有这样一个队列的全部目的。事实上,不需要 a ,JoinableQueue因为主进程已经提交了num_jobs作业并且期待num_jobs结果队列上的消息,并且可以循环并从结果队列中提取预期数量的结果。我已经用一个简单Queue的方法代替了JoinableQueue保留原始代码但注释掉的方法。此外,这些Consumer进程可以创建为守护进程(带参数daemon=True) 然后它们会在所有非守护进程(即主进程)终止时自动终止,从而无需使用特殊的“毒丸”None任务消息。我已经进行了更改,并再次保持原始代码不变,但将其注释掉以进行比较。

import multiprocessing

class Consumer(multiprocessing.Process):

    def __init__(self, task_queue, result_queue, stop):
        # make ourself a daemon process:
        multiprocessing.Process.__init__(self, daemon=True)
        self.task_queue = task_queue
        self.result_queue = result_queue
        self.stop = stop

    def run(self):
        global stop
        stop = self.stop
        while True:
            next_task = self.task_queue.get()
            if next_task is None:
                # Poison pill shutdown of .get() loop with break
            answer = next_task()
        # return

class Task(object):
    def __init__(self, a, b):
        self.a = a
        self.b = b

    def __call__(self):
        global stop
        # start the range from 1 to avoid dividing by 0:
        for i in range(1, self.b):
            # how frequently should this check be made?
            if stop.value == 1:
                return 0
            if self.a % i == 0:
                stop.value = 1
                return 0
        return 1

if __name__ == '__main__':
    # Establish communication queues
    #tasks = multiprocessing.JoinableQueue()
    tasks = multiprocessing.Queue()
    results = multiprocessing.Queue()

    # Number of consumers equal to system cpu_count
    num_consumers = multiprocessing.cpu_count()

    # Make a list of Consumer object process' ready to be opened.
    stop = multiprocessing.Value('i', 0)
    consumers = [ Consumer(tasks, results, stop) for i in range(num_consumers) ]

    for w in consumers:

    # Enqueue jobs for the Class Consumer process' run() while-loop to .get() a workload:
    # many more jobs than processes, but they will stop immediately once they check the value of stop.value:
    num_jobs = 100
    stop.value = 0 # make sure it is 0 before an iteration
    for i in range(num_jobs):
        tasks.put(Task(i, 100)) # Similar jobs would be reiterated before poison pill.

    # We start to .get() the results in a different loop-
    results = [results.get() for _ in range(num_jobs)]
    print(0 in results)

    # Add a poison pill for each consumer
    for i in range(num_consumers): # We only do this when all computation is done.
        tasks.put(None) # Here we break all loops of open Consumer enqueue-able process'.


[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
