首页 > 解决方案 > 仅使用单个线程而不是多个线程进行多处理

问题描述

这个问题最近被问过并解决了几次,但我有一个非常具体的例子......

我有一个多处理功能,昨天在完全隔离的情况下工作得非常好(在交互式笔记本中),但是,我决定参数化,以便我可以将它作为更大管道的一部分并用于抽象/更清洁的笔记本,现在它只使用单线程而不是6。

import pandas as pd
import multiprocessing as mp
from multiprocessing import get_context
mp.set_start_method('forkserver')


def multiprocess_function(func, iterator, input_data):
    result_list = []

    def append_result(result):
        result_list.append(result)

    with get_context('fork').Pool(processes=6) as pool:
        for i in iterator:
            pool.apply_async(func, args = (i, input_data), callback = append_result)
        pool.close()
        pool.join()

    return result_list
multiprocess_function(count_live, run_weeks, base_df)

我以前版本的代码执行方式不同,而不是返回/调用,我在函数底部使用以下内容(现在我已经参数化了它根本不起作用 - 即使分配了参数)

if __name__ == '__main__':
    multiprocess_function()

该函数执行良好,仅根据顶部的输出在一个线程上运行。

抱歉,如果这是非常简单的事情-我不是程序员,我是分析师:)

编辑:如果我在函数底部包含 if__name__ ==' main ': 等并执行单元格,那么一切都工作得很好,但是,当我这样做时,我必须删除参数 - 也许只是与范围界定有关。如果我通过调用函数来执行,无论它是否参数化,它都只在单个线程上运行。

标签: pythonpython-3.xmultiprocessingpython-multiprocessing

解决方案


你有两个问题:

  1. 您没有使用导入防护。

  2. 您没有在导入防护中设置默认启动方法。

在它们两者之间,您最终告诉 Python 在 forkserver 中生成 forkserver,这只会让您感到悲伤。将代码结构更改为:

import pandas as pd
import multiprocessing as mp
from multiprocessing import get_context


def multiprocess_function(func, iterator, input_data):
    result_list = []
    with get_context('fork').Pool(processes=6) as pool:
        for i in iterator:
            pool.apply_async(func, args=(i, input_data), callback=result_list.append)
        pool.close()
        pool.join()

    return result_list

if __name__ == '__main__':
    mp.set_start_method('forkserver')
    multiprocess_function(count_live, run_weeks, base_df)

由于您没有显示您从哪里获得和从哪里获得的count_live,我只想说对于编写的代码,它们应该在受保护的部分中定义(因为没有任何东西依赖它们作为全局变量)。run_weeksbase_df

还有其他改进(apply_async使用的方式让我觉得你真的只想列出 的结果pool.imap_unordered,没有显式循环),但这是解决会破坏使用spawnforkserver启动方法的大问题。


推荐阅读