首页 > 解决方案 > Python 多处理 - Pool.map 的结果为无

问题描述

我正在努力让我的多处理启动并运行。下面是我的代码示例。

import pandas as pd
import networkx as nx
import ast
import numpy as np
import itertools
import glob
from multiprocessing import Pool as ProcessPool

def network_inference(year)   
   # some code that generate a net and a df_complete dataframe
   res = pd.Series()
   res['YEAR'] = year
   res['NODES'] = net.number_of_nodes()
   res['EDGES'] = net.number_of_edges()

   return (res, df_complete['VENUE_CLASS'].value_counts())


if __name__=='__main__':
    years = [x.replace('\\','/').replace('data_of_interest.csv','') for x in glob.glob('data/raw/*/data_of_interest.csv')][:10]
    pool = ProcessPool(50) 
    results = pool.map_async(network_inference, years)
    pool.close() 
    pool.join()

    results.wait()

    l = results.get()

    structure = pd.DataFrame()
    for s1,s2 in l:
        structure = structure.append(s1.append(s2), ignore_index=True)

    structure.to_csv('data/structure_data_features.csv', index = False)

如您所见,非常简单,一个文件列表上的映射函数。每个进程返回一个元组。所以问题发生在我打电话时result.get()。当代码运行时,我收到以下错误:

Traceback (most recent call last):

  File "/AD-HOME/------/Desktop/dblp/network_inference.py", line 132, in <module>
    for s1,s2 in l:

TypeError: 'NoneType' object is not iterable

但是,如果稍后在错误之后我results.get()在 Ipython 终端(我正在使用 Spyder)中询问结果(),我得到了正确的输出。我想发布一个完整的mwe,但是,我无法使用更简单的代码重现此错误,也无法发布原始代码。

我正在寻找解决此问题的想法,我知道在调用时map我需要等待它计算所有内容(我相信我正在使用join()然后使用results.wait())。

标签: pythonmultiprocessing

解决方案


我建议使用以下方法管理上下文with

if __name__=='__main__':
    years = [x.replace('\\','/').replace('data_of_interest.csv','') for x in glob.glob('data/raw/*/data_of_interest.csv')][:10]
    with ProcessPool(50) as pool:
        results = pool.map_async(network_inference, years)
        l = results.get()

        structure = pd.DataFrame()
        for s1,s2 in l:
            structure = structure.append(s1.append(s2), ignore_index=True)

        structure.to_csv('data/structure_data_features.csv', index = False)

您可以删除closejoin使用此方法,然后您需要在离开ProcessPool. 当您close和时join,您的结果会在您访问它们之前被删除。

您也不需要wait在这里,因为您.get()立即使用之后已经等待您的结果。


推荐阅读