首页 > 解决方案 > 当通过管道传递大型数组时,Python 多处理卡住了

问题描述

我在python中使用多处理并尝试通过管道将一个大的numpy数组传递给一个子进程。它适用于小数组,但会挂起较大的数组而不会返回错误。

我相信管道被阻塞并且已经阅读了一些关于它但无法弄清楚如何解决问题的方法。

def f2(conn, x):
    conn.start()
    data = conn.recv()
    conn.join()

    print(data)
    do_something(x)

    conn.close()

if __name__ == '__main__':
    data_input = read_data()    # large numpy array
    parent_conn, child_conn = Pipe()

    p = multiprocessing.Pool(processes=8)      
    func = partial(f2, child_conn)

    parent_conn.send(data_input)
    parent_conn.close()

    result = p.map(func, processes)

    p.close()
    p.join()

标签: pythonmultiprocessingpipe

解决方案


忽略此代码中的所有其他问题(您没有x传递给map,您不使用接收x f2,混合通常是错误的做法),您的最终问题是在工作人员之前执行的阻塞调用进程可以从中读取。Pool.mapPipesend

假设您真的想与 混合mapPipe解决方案是在开始之前map异步启动 ,因此在父尝试写入时从另一侧读取一些内容:sendPipe

if __name__ == '__main__':
    data_input = read_data()    # large numpy array
    parent_conn, child_conn = Pipe()

    # Use with to avoid needing to explicitly close/join
    with multiprocessing.Pool(processes=8) as p:
        func = partial(f2, child_conn)

        # Launch async map to ensure workers are running
        future = p.map_async(func, x)

        # Can perform blocking send as workers will consume as you send
        parent_conn.send(data_input)
        parent_conn.close()

        # Now you can wait on the map to complete
        result = future.get()

如前所述,由于 的问题,此代码不会运行x,即使它运行了,Pipe文档也明确警告不应同时读取两个不同的进程Pipe

如果您想在单个工作人员中批量处理数据,您只需使用Processand Pipe,例如:

def f2(conn):
    data = conn.recv()
    conn.close()
    print(data)

if __name__ == '__main__':
    parent_conn, child_conn = Pipe()

    proc = multiprocessing.Process(target=f2, args=(child_conn,))
    proc.start()

    data_input = read_data()    # large numpy array
    parent_conn.send(data_input)
    parent_conn.close()

    proc.join()

如果您想跨多个工作人员分别处理每个元素,您只需使用Pooland map

def f2(x):
    print(x)

if __name__ == '__main__':
    data_input = read_data()    # large numpy array
    with multiprocessing.Pool(processes=8) as p:   
        result = p.map(f2, data_input)

推荐阅读