首页 > 解决方案 > python中的多处理-完成后进程未关闭

问题描述

我在python中有一个进程池,它可以正常启动进程,但是,我刚刚意识到这些进程在完成后没有关闭(我知道它们完成了,因为最后一个语句是文件写入)。在代码下方,带有示例函数 ppp:

from multiprocessing import Pool
import itertools

def ppp(element):
    window,day = element
    print(window,day)
    time.sleep(10)

if __name__ == '__main__':  ##The line marked
    print('START')
    start_time = current_milli_time()
    days = ['0808', '0810', '0812', '0813', '0814', '0817', '0818', '0827']
    windows = [1000,2000,3000,4000,5000,10000,15000, 20000,30000,60000,120000,180000]
    processes_args = list(itertools.product(windows, days))        
    pool = Pool(8) 
    results = pool.map(ppp, processes_args)
    pool.close() 
    pool.join() 
    print('END', current_milli_time()-start_time)

我正在使用 Linux,Ubuntu 16.04。在我添加示例中标记的行之前,一切正常。我想知道这种行为是否与缺少 return 语句有关。无论如何,这就是我的“htop”: 在此处输入图像描述 如您所见,没有关闭任何进程,但所有进程都已完成工作。

我发现了相关问题:Python Multiprocessing pool.close() and join() does not close processes,但是,我不明白这个问题的解决方案是否是使用 map_async 而不是 map。

编辑:真正的功能代码:

def process_day(element):
    window,day = element
    noise = 0.2
    print('Processing day:', day,', window:', window)
    individual_files = glob.glob('datan/'+day+'/*[0-9].csv')
    individual = readDataset(individual_files)
    label_time = individual.loc[(individual['LABEL_O'] != -2) | (individual['LABEL_F'] != -2), 'TIME']
    label_time = list(np.unique(list(label_time)))
    individual = individual[individual['TIME'].isin(label_time)]
    #Saving IDs for further processing
    individual['ID'] = individual['COLLAR']
    #Time variable in seconds for aggregation and merging
    individual['TIME_S'] = individual['TIME'].copy()
    noise_x = np.random.normal(0,noise,len(individual))
    noise_y = np.random.normal(0,noise,len(individual))
    noise_z = np.random.normal(0,noise,len(individual))
    individual['X_AXIS'] = individual['X_AXIS'] + noise_x
    individual['Y_AXIS'] = individual['Y_AXIS'] + noise_y
    individual['Z_AXIS'] = individual['Z_AXIS'] + noise_z
    #Time syncronization (applying milliseconds for time series processing)
    print('Time syncronization:')
    with progressbar.ProgressBar(max_value=len(individual.groupby('ID'))) as bar:
        for baboon,df_baboon in individual.groupby('ID'):
            times = list(df_baboon['TIME'].values)
            d = Counter(times)
            result = []
            for timestamp in np.unique(times):
                for i in range(0,d[timestamp]):
                    result.append(str(timestamp+i*1000/d[timestamp]))
            individual.loc[individual['ID'] == baboon,'TIME'] = result
            bar.update(1)

    #Time series process
    ts_process = time_series_processing(window, 'TIME_S', individual, 'COLLAR', ['COLLAR', 'TIME', 'X_AXIS','Y_AXIS','Z_AXIS'])
    #Aggregation and tsfresh
    ts_process.do_process()
    individual = ts_process.get_processed_dataframe()
    individual.to_csv('noise2/processed_data/'+str(window)+'/agg/'+str(day)+'.csv', index = False)
    #NEtwork inference process
    ni = network_inference_process(individual, 'TIME_S_mean')
    #Inference
    ni.do_process()
    final = ni.get_processed_dataframe()
    final.to_csv('noise2/processed_data/'+str(window)+'/net/'+str(day)+'.csv', index = False)
    #Saving not aggregated ground truth
    ground_truth = final[['ID_mean', 'TIME_S_mean', 'LABEL_O_values', 'LABEL_F_values']].copy()
    #Neighbor features process
    neighbors_features_f = ni.get_neighbor_features(final, 'TIME_S_mean', 'ID_mean')
    neighbors_features_f = neighbors_features_f.drop(['LABEL_O_values_n', 'LABEL_F_values_n'], axis=1)
    neighbors_features_f.to_csv('noise2/processed_data/'+str(window)+'/net/'+str(day)+'_neigh.csv', index = False)
    # Final features dataframe
    final_neigh = pd.merge(final, neighbors_features_f,  how='left', left_on=['TIME_S_mean','ID_mean'], right_on = ['TIME_S_mean_n','BABOON_NODE_n'])
    final_neigh.to_csv('noise2/processed_data/'+str(window)+'/complete/'+str(day)+'.csv', index = False)
    return

如您所见,最后一条语句是写入文件,它由所有进程执行,我实际上并不认为问题出在这个函数内部。

标签: pythonpython-multiprocessingpython-pool

解决方案


推荐阅读