首页 > 解决方案 > Tensorflow 的 Python 多处理工作队列

问题描述

Tensorflow 可以运行很长时间的训练会话,并且可能会在页面刷新之间使服务器崩溃。

我希望我可以使用先前答案中的以下内容来解决此问题:

我有一个必须运行 12 次的 python 函数。我目前已设置为使用多处理库中的 Pool 来并行运行所有这些库。通常我一次运行 6 个,因为该函数是 CPU 密集型的,并且并行运行 12 个通常会导致程序崩溃。当我们一次做 6 个时,在前 6 个过程全部完成之前,第二组 6 个不会开始。理想情况下,我们希望在最初一批 6 个中的一个完成后立即启动另一个(例如第 7 个)——这样 6 个同时运行,同时还有更多的启动。现在代码看起来像这样(它会被调用两次,将前 6 个元素传递到一个列表中,然后将第二个 6 传递到另一个列表中:

from multiprocessing import Pool

def start_pool(project_list):

    pool = Pool(processes=6)
    pool.map(run_assignments_parallel,project_list)

所以我一直在尝试实施工人/队列解决方案并遇到了一些问题。我有一个看起来像这样的工作函数:

def worker(work_queue, done_queue):
    try:
        for proj in iter(work_queue.get, 'STOP'):
            print proj
            run_assignments_parallel(proj)
            done_queue.put('finished ' + proj )
    except Exception, e:        
        done_queue.put("%s failed on %s with: %s" % (current_process().name, proj,        e.message))
    return True

调用worker函数的代码如下:

workers = 6
work_queue = Queue()
done_queue = Queue()  
processes = []
for project in project_list:
    print project
    work_queue.put(project)
for w in xrange(workers):        
    p = Process(target=worker, args=(work_queue, done_queue))
    p.start()
    processes.append(p)
    work_queue.put('STOP')
for p in processes:
     p.join()    
     done_queue.put('STOP')
for status in iter(done_queue.get, 'STOP'):        
    print status

project_list只是需要在函数中运行的 12 个项目的路径列表run_assignments_parallel

现在这样写,该函数被多次调用同一个进程(项目),我真的不知道发生了什么。这段代码基于我找到的一个例子,我很确定循环结构搞砸了。任何帮助都会很棒,我为我对此事的无知表示歉意。谢谢!


这似乎运行路径,比如里面有 python 的文件名,我希望把它放在我的 django 服务器的视图路径中,并让它通过 tensorflow 培训课程运行

标签: python

解决方案


这是一个由您的代码片段组成的最小运行示例。由于不推荐使用 Python 2,因此我将您的代码转换为 Python 3,并且不应使用此版本编写新代码。

from multiprocessing import Process
from multiprocessing.process import current_process
from queue import Queue


def worker(work_queue, done_queue):
    proj=None
    try:
        for proj in iter(work_queue.get, 'STOP'):
            print(current_process().name, "#", proj)
            done_queue.put('finished ' + proj )
    except Exception as e:
        done_queue.put("%s failed on %s with: %s" % (current_process().name, proj, e.args))
    print(current_process().name, "#", "Quit")
    return True

def main():
    workers = 6
    work_queue = Queue()
    done_queue = Queue()
    processes = []
    project_list = list("ab")
    for project in project_list:
        work_queue.put(project)
    for w in range(workers):
        p = Process(target=worker, args=(work_queue, done_queue))
        p.start()
        processes.append(p)
        work_queue.put('STOP')
    for p in processes:
        p.join()
        done_queue.put('STOP')
    for status in iter(done_queue.get, 'STOP'):
        print(status)

if __name__ == '__main__':
    main()
Process-1 # a
Process-1 # b
Process-2 # a
Process-2 # b
Process-2 # Quit
Process-3 # a
Process-3 # b
Process-3 # Quit
Process-4 # a
Process-4 # b
Process-4 # Quit
Process-5 # a
Process-5 # b
Process-5 # Quit
Process-6 # a
Process-6 # b
Process-6 # Quit

如您所见,每个项目都在每个工人中处理。然而,主进程在加入进程时挂起,因为Process-1永远不会终止......我认为这是由于iter(work_queue.get, 'STOP')调用,我不明白你试图在那里实现什么。

你使用队列的方式对我来说似乎很奇怪。我建议您更深入地阅读queue模块文档,特别是task_done必须与get.

最后,您为什么不坚持Pool我认为最适合您的问题的解决方案。Pool 将自动处理等待队列,并确保尽可能多的 (6) 个工作人员同时运行。


推荐阅读