python - Tensorflow 的 Python 多处理工作队列
问题描述
Tensorflow 可以运行很长时间的训练会话,并且可能会在页面刷新之间使服务器崩溃。
我希望我可以使用先前答案中的以下内容来解决此问题:
我有一个必须运行 12 次的 python 函数。我目前已设置为使用多处理库中的 Pool 来并行运行所有这些库。通常我一次运行 6 个,因为该函数是 CPU 密集型的,并且并行运行 12 个通常会导致程序崩溃。当我们一次做 6 个时,在前 6 个过程全部完成之前,第二组 6 个不会开始。理想情况下,我们希望在最初一批 6 个中的一个完成后立即启动另一个(例如第 7 个)——这样 6 个同时运行,同时还有更多的启动。现在代码看起来像这样(它会被调用两次,将前 6 个元素传递到一个列表中,然后将第二个 6 传递到另一个列表中:
from multiprocessing import Pool
def start_pool(project_list):
pool = Pool(processes=6)
pool.map(run_assignments_parallel,project_list)
所以我一直在尝试实施工人/队列解决方案并遇到了一些问题。我有一个看起来像这样的工作函数:
def worker(work_queue, done_queue):
try:
for proj in iter(work_queue.get, 'STOP'):
print proj
run_assignments_parallel(proj)
done_queue.put('finished ' + proj )
except Exception, e:
done_queue.put("%s failed on %s with: %s" % (current_process().name, proj, e.message))
return True
调用worker函数的代码如下:
workers = 6
work_queue = Queue()
done_queue = Queue()
processes = []
for project in project_list:
print project
work_queue.put(project)
for w in xrange(workers):
p = Process(target=worker, args=(work_queue, done_queue))
p.start()
processes.append(p)
work_queue.put('STOP')
for p in processes:
p.join()
done_queue.put('STOP')
for status in iter(done_queue.get, 'STOP'):
print status
project_list
只是需要在函数中运行的 12 个项目的路径列表run_assignments_parallel
。
现在这样写,该函数被多次调用同一个进程(项目),我真的不知道发生了什么。这段代码基于我找到的一个例子,我很确定循环结构搞砸了。任何帮助都会很棒,我为我对此事的无知表示歉意。谢谢!
这似乎运行路径,比如里面有 python 的文件名,我希望把它放在我的 django 服务器的视图路径中,并让它通过 tensorflow 培训课程运行
解决方案
这是一个由您的代码片段组成的最小运行示例。由于不推荐使用 Python 2,因此我将您的代码转换为 Python 3,并且不应使用此版本编写新代码。
from multiprocessing import Process
from multiprocessing.process import current_process
from queue import Queue
def worker(work_queue, done_queue):
proj=None
try:
for proj in iter(work_queue.get, 'STOP'):
print(current_process().name, "#", proj)
done_queue.put('finished ' + proj )
except Exception as e:
done_queue.put("%s failed on %s with: %s" % (current_process().name, proj, e.args))
print(current_process().name, "#", "Quit")
return True
def main():
workers = 6
work_queue = Queue()
done_queue = Queue()
processes = []
project_list = list("ab")
for project in project_list:
work_queue.put(project)
for w in range(workers):
p = Process(target=worker, args=(work_queue, done_queue))
p.start()
processes.append(p)
work_queue.put('STOP')
for p in processes:
p.join()
done_queue.put('STOP')
for status in iter(done_queue.get, 'STOP'):
print(status)
if __name__ == '__main__':
main()
Process-1 # a
Process-1 # b
Process-2 # a
Process-2 # b
Process-2 # Quit
Process-3 # a
Process-3 # b
Process-3 # Quit
Process-4 # a
Process-4 # b
Process-4 # Quit
Process-5 # a
Process-5 # b
Process-5 # Quit
Process-6 # a
Process-6 # b
Process-6 # Quit
如您所见,每个项目都在每个工人中处理。然而,主进程在加入进程时挂起,因为Process-1
永远不会终止......我认为这是由于iter(work_queue.get, 'STOP')
调用,我不明白你试图在那里实现什么。
你使用队列的方式对我来说似乎很奇怪。我建议您更深入地阅读queue
模块文档,特别是task_done
必须与get
.
最后,您为什么不坚持Pool
我认为最适合您的问题的解决方案。Pool 将自动处理等待队列,并确保尽可能多的 (6) 个工作人员同时运行。
推荐阅读
- powershell - Powershell:具有多个参数的选项
- itext7 - 有什么方法可以从 PDF 中提取整个矩形,而无需在 c# 或 VB.net 中使用 itext7 逐行提取
- docker - docker nginx没有加载css样式
- java - 插入更多行时更新表格布局内容
- r - RMarkdown - DOCX 中的混合横向 + 纵向页面与参考 Docx
- javascript - 基于过滤器组合的过滤器阵列
- node.js - 如何在 Digital Ocean 应用平台启用 fastify 的 http2
- laravel - 如何在 Laravel 应用程序中向电子邮件添加附件?
- json - 如何将 json 文件从 labelme 接口转换为 png 或图像格式文件?
- python - 在 python wheel 中包含依赖信息