python - 在 Windows 上高效地进行 Python 多处理
问题描述
假设我们将一个数字拆分为不同的域:例如:100 拆分为:[0, 25] [25, 50] [50, 75] [75, 100]。然后我们将这 4 个列表中的每一个发送到 4 个单独的过程中的一个进行计算,然后将答案重新组合为数字 100 的单个拆分单元。我们根据需要连续多次迭代这个过程' 以将 1000 个数字作为一个单位工作,这些数字被分成类似于 [0, 25] [25, 50] [50, 75] [75, 100] 的单独域。如果我们必须关闭进程以使它们充当为答案而处理的单个组单元,则会出现效率问题。由于 windows 在运行进程方面与 Unix 相比是垃圾,我们被迫使用“spawn”方法而不是 fork。spawn 方法在生成过程中很慢'所以我想为什么不保留这个过程' 打开并从它们传递数据,而无需为并行过程的每个迭代组打开和关闭它们。下面的示例代码将执行此操作。它将保持进程'作为类消费者打开,这些消费者将不断地使用 run()(在 while 循环中)请求带有 .get() 可加入队列的 next_task:
import multiprocessing
class Consumer(multiprocessing.Process):
def __init__(self, task_queue, result_queue):
multiprocessing.Process.__init__(self)
self.task_queue = task_queue
self.result_queue = result_queue
def run(self):
while True:
next_task = self.task_queue.get()
if next_task is None:
# Poison pill shutdown of .get() loop with break
self.task_queue.task_done()
break
answer = next_task()
self.task_queue.task_done()
self.result_queue.put(answer)
return
class Task(object):
def __init__(self, a, b):
self.a = a
self.b = b
def __call__(self):
for i in range(self.b):
if self.a % i == 0:
return 0
return 1
if __name__ == '__main__':
# Establish communication queues
tasks = multiprocessing.JoinableQueue()
results = multiprocessing.Queue()
# Number of consumers equal to system cpu_count
num_consumers = multiprocessing.cpu_count()
# Make a list of Consumer object process' ready to be opened.
consumers = [ Consumer(tasks, results) for i in range(num_consumers) ]
for w in consumers:
w.start()
# Enqueue jobs for the Class Consumer process' run() while-loop to .get() a workload:
num_jobs = 10
for i in range(num_jobs):
tasks.put(Task(i, 100)) # Similar jobs would be reiterated before poison pill.
# We start to .get() the results in a different loop-
for _ in range(num_jobs): # -so the above loop enqueues all jobs without-
result = results.get() # -waiting for the previous .put() to .get() first.
# Add a poison pill for each consumer
for i in range(num_consumers): # We only do this when all computation is done.
tasks.put(None) # Here we break all loops of open Consumer enqueue-able process'.
此代码只是一个示例。在此代码的其他变体中:当实现 tasks.put() 和 results.get() 的更多迭代时,需要一种方法来使入队的 Task(object) 在完全计算答案之前通过外部调用返回并自行返回。如果您已经从该单个拆分号码组的“其他进程”之一获得答案,这将释放资源。描述__call__
符需要存在,Task(object) 才能作为调用 tasks.put(Task(i, 100)) 的函数工作。在过去的两周里,我一直在试图找出一种有效的方法来做到这一点。我需要采取完全不同的方法吗?不要误解我的困境,我正在使用有效的代码,但没有我在 Microsoft Windslows 上想要的效率。任何帮助将不胜感激。
Task(object) 与将它入队的 Consumer() 进程不存在于同一进程中吗?如果是这样,我不能告诉 Class Consumer() Run() 的所有进程停止他们当前正在运行的 Task(object) 而不关闭他们的 while 循环(使用毒丸),这样他们就可以立即接受另一个 Task() 而无需是否需要再次关闭并重新打开他们的流程?当您为迭代计算打开和关闭数千个进程时,它确实会增加并浪费时间。我曾尝试使用 Events() Managers() 其他 Queues()。似乎没有一种有效的方法可以立即从外部干预任务(对象)return
到它的父 Consumer() 以便它不会继续浪费资源计算如果其他 Consumers() 之一返回的答案使其他 Consumer() 任务的计算无关紧要,因为它们都作为单个数字的统一计算工作分成小组。
解决方案
您所做的是实现了自己的多处理池,但为什么呢?您是否不知道concurrent.futures.ProcessPoolExecutor
和multiprocessing.pool.Pool
类的存在,后者实际上更适合您的特定问题?
这两个类都实现了多处理池和用于将任务提交到池并从这些任务中返回结果的各种方法。但是,由于在您的特定情况下,您提交的任务正在尝试解决相同的问题,并且您只对第一个可用结果感兴趣,一旦完成,您需要能够终止任何剩余的正在运行的任务。只multiprocessing.pool.Pool
允许你这样做。
以下代码使用方法Pool.apply_async
提交任务。此函数不会阻塞,而是返回一个AsyncResult
具有阻塞get
方法的实例,您可以调用该方法从提交的任务中获取结果。但是,由于通常您可能会提交许多任务,我们不知道要调用这些实例中的哪一个get
。因此,解决方案是改为使用指定函数的callback
参数,apply_async
该函数将在可用时使用任务的返回值异步调用。然后问题就变成了传达这个结果。有两种方法:
方法一:通过全局变量
from multiprocessing import Pool
import time
def worker1(x):
time.sleep(3) # emulate working on the problem
return 9 # the solution
def worker2(x):
time.sleep(1) # emulate working on the problem
return 9 # the solution
def callback(answer):
global solution
# gets all the returned results from submitted tasks
# since we are just interested in the first returned result, write it to the queue:
solution = answer
pool.terminate() # kill all tasks
if __name__ == '__main__':
t = time.time()
pool = Pool(2) # just two processes in the pool for demo purposes
# submit two tasks:
pool.apply_async(worker1, args=(1,), callback=callback)
pool.apply_async(worker2, args=(2,), callback=callback)
# wait for all tasks to terminate:
pool.close()
pool.join()
print(solution)
print('Total elapsed time:', time.time() - t)
印刷:
9
Total elapsed time: 1.1378364562988281
方法2:通过队列
from multiprocessing import Pool
from queue import Queue
import time
def worker1(x):
time.sleep(3) # emulate working on the problem
return 9 # the solution
def worker2(x):
time.sleep(1) # emulate working on the problem
return 9 # the solution
def callback(solution):
# gets all the returned results from submitted tasks
# since we are just interested in the first returned result, write it to the queue:
q.put_nowait(solution)
if __name__ == '__main__':
t = time.time()
q = Queue()
pool = Pool(2) # just two processes in the pool for demo purposes
# submit two tasks:
pool.apply_async(worker1, args=(1,), callback=callback)
pool.apply_async(worker2, args=(2,), callback=callback)
# wait for first returned result from callback:
solution = q.get()
print(solution)
pool.terminate() # kill all tasks in the pool
print('Total elapsed time:', time.time() - t)
印刷:
9
Total elapsed time: 1.1355643272399902
更新
即使在 Windows 下,与任务完成所需的时间相比,创建和重新创建池的时间也可能相对微不足道,尤其是对于以后的迭代,即较大的n
. 如果您正在调用相同的工作函数,那么第三种方法是使用 pool method imap_unordered
。我还包括一些代码来衡量我的桌面启动新池实例的开销是多少:
from multiprocessing import Pool
import time
def worker(x):
time.sleep(x) # emulate working on the problem
return 9 # the solution
if __name__ == '__main__':
# POOLSIZE = multiprocessing.cpu_count()
POOLSIZE = 8 # on my desktop
# how long does it take to start a pool of size 8?
t1 = time.time()
for i in range(16):
pool = Pool(POOLSIZE)
pool.terminate()
t2 = time.time()
print('Average pool creation time: ', (t2 - t1) / 16)
# POOLSIZE number of calls:
arguments = [7, 6, 1, 3, 4, 2, 9, 6]
pool = Pool(POOLSIZE)
t1 = time.time()
results = pool.imap_unordered(worker, arguments)
it = iter(results)
first_solution = next(it)
t2 = time.time()
pool.terminate()
print('Total elapsed time:', t2 - t1)
print(first_solution)
印刷:
Average pool creation time: 0.053139880299568176
Total elapsed time: 1.169790506362915
9
更新 2
这是一个难题:您有多个进程在处理一个难题。例如,一旦一个进程发现一个数字可以被通过范围内的数字之一整除,那么在其他进程中测试不同范围内的数字以完成其测试就没有意义了。你可以做三件事之一。您可以什么都不做,让流程在开始下一次迭代之前完成。但这会延迟下一次迭代。我已经建议您终止进程,从而释放处理器。但这需要您创建新的流程,而您认为这些流程并不令人满意。
我只能想到另一种可能性,我在下面使用您的多处理方法介绍了这种可能性。一个名为的多处理共享内存变量stop
被初始化为每个进程作为一个全局变量,并在每次迭代之前设置为 0。当一个任务被设置为返回值 0 并且在其他进程中运行的其他任务没有任何意义时,它会将值设置stop
为 1。这意味着任务必须定期检查 的值,stop
如果已设置则返回到 1。当然,这会为处理增加额外的周期。在下面的演示中,我实际上有 100 个任务排队等待 8 个处理器。但是最后 92 个任务将立即发现stop
已设置并应在第一次迭代时返回。
顺便说一句:原始代码使用一个multiprocessing.JoinableQueue
实例来对任务进行排队,而不是在该实例上进行multiprocessing.Queue
调用,task_done
因为消息已从队列中取出。然而,join
从来没有在这个队列上进行过调用(它会告诉你什么时候所有的消息都被删除了),从而违背了拥有这样一个队列的全部目的。事实上,不需要 a ,JoinableQueue
因为主进程已经提交了num_jobs
作业并且期待num_jobs
结果队列上的消息,并且可以循环并从结果队列中提取预期数量的结果。我已经用一个简单Queue
的方法代替了JoinableQueue
保留原始代码但注释掉的方法。此外,这些Consumer
进程可以创建为守护进程(带参数daemon=True
) 然后它们会在所有非守护进程(即主进程)终止时自动终止,从而无需使用特殊的“毒丸”None
任务消息。我已经进行了更改,并再次保持原始代码不变,但将其注释掉以进行比较。
import multiprocessing
class Consumer(multiprocessing.Process):
def __init__(self, task_queue, result_queue, stop):
# make ourself a daemon process:
multiprocessing.Process.__init__(self, daemon=True)
self.task_queue = task_queue
self.result_queue = result_queue
self.stop = stop
def run(self):
global stop
stop = self.stop
while True:
next_task = self.task_queue.get()
"""
if next_task is None:
# Poison pill shutdown of .get() loop with break
#self.task_queue.task_done()
break
"""
answer = next_task()
#self.task_queue.task_done()
self.result_queue.put(answer)
# return
class Task(object):
def __init__(self, a, b):
self.a = a
self.b = b
def __call__(self):
global stop
# start the range from 1 to avoid dividing by 0:
for i in range(1, self.b):
# how frequently should this check be made?
if stop.value == 1:
return 0
if self.a % i == 0:
stop.value = 1
return 0
return 1
if __name__ == '__main__':
# Establish communication queues
#tasks = multiprocessing.JoinableQueue()
tasks = multiprocessing.Queue()
results = multiprocessing.Queue()
# Number of consumers equal to system cpu_count
num_consumers = multiprocessing.cpu_count()
# Make a list of Consumer object process' ready to be opened.
stop = multiprocessing.Value('i', 0)
consumers = [ Consumer(tasks, results, stop) for i in range(num_consumers) ]
for w in consumers:
w.start()
# Enqueue jobs for the Class Consumer process' run() while-loop to .get() a workload:
# many more jobs than processes, but they will stop immediately once they check the value of stop.value:
num_jobs = 100
stop.value = 0 # make sure it is 0 before an iteration
for i in range(num_jobs):
tasks.put(Task(i, 100)) # Similar jobs would be reiterated before poison pill.
# We start to .get() the results in a different loop-
results = [results.get() for _ in range(num_jobs)]
print(results)
print(0 in results)
"""
# Add a poison pill for each consumer
for i in range(num_consumers): # We only do this when all computation is done.
tasks.put(None) # Here we break all loops of open Consumer enqueue-able process'.
"""
印刷:
[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
True
推荐阅读
- css - 如果 SCSS 中对具有属性的元素的语句等于 true?
- loops - .htaccess mod_rewrite 未按预期工作(重定向到文件夹)
- google-cloud-platform - Velostrata 从本地迁移到 GCP。错误日志:echo boot 失败,等待磁盘联机
- python - 检查数据框值 +/- 1 是否存在于给定列中的其他任何位置
- aws-lambda - 来自 Api Gatway 的带参数的异步 Lambda 调用
- roslyn - Roslyn - 将编辑的文档保存到物理解决方案
- android - 在传递依赖项之前,我怎么能不能构造它们?
- python - Python - 在 Django 2 中创建多种用户类型和电子邮件作为用户名字段
- python - 在 PyWeka 中使用包(Python Weka Wrapper)
- c# - 以编程方式,如何指定要放置表 C# 的数据库