python - python中的多处理-完成后进程未关闭
问题描述
我在python中有一个进程池,它可以正常启动进程,但是,我刚刚意识到这些进程在完成后没有关闭(我知道它们完成了,因为最后一个语句是文件写入)。在代码下方,带有示例函数 ppp:
from multiprocessing import Pool
import itertools
def ppp(element):
window,day = element
print(window,day)
time.sleep(10)
if __name__ == '__main__': ##The line marked
print('START')
start_time = current_milli_time()
days = ['0808', '0810', '0812', '0813', '0814', '0817', '0818', '0827']
windows = [1000,2000,3000,4000,5000,10000,15000, 20000,30000,60000,120000,180000]
processes_args = list(itertools.product(windows, days))
pool = Pool(8)
results = pool.map(ppp, processes_args)
pool.close()
pool.join()
print('END', current_milli_time()-start_time)
我正在使用 Linux,Ubuntu 16.04。在我添加示例中标记的行之前,一切正常。我想知道这种行为是否与缺少 return 语句有关。无论如何,这就是我的“htop”: 如您所见,没有关闭任何进程,但所有进程都已完成工作。
我发现了相关问题:Python Multiprocessing pool.close() and join() does not close processes,但是,我不明白这个问题的解决方案是否是使用 map_async 而不是 map。
编辑:真正的功能代码:
def process_day(element):
window,day = element
noise = 0.2
print('Processing day:', day,', window:', window)
individual_files = glob.glob('datan/'+day+'/*[0-9].csv')
individual = readDataset(individual_files)
label_time = individual.loc[(individual['LABEL_O'] != -2) | (individual['LABEL_F'] != -2), 'TIME']
label_time = list(np.unique(list(label_time)))
individual = individual[individual['TIME'].isin(label_time)]
#Saving IDs for further processing
individual['ID'] = individual['COLLAR']
#Time variable in seconds for aggregation and merging
individual['TIME_S'] = individual['TIME'].copy()
noise_x = np.random.normal(0,noise,len(individual))
noise_y = np.random.normal(0,noise,len(individual))
noise_z = np.random.normal(0,noise,len(individual))
individual['X_AXIS'] = individual['X_AXIS'] + noise_x
individual['Y_AXIS'] = individual['Y_AXIS'] + noise_y
individual['Z_AXIS'] = individual['Z_AXIS'] + noise_z
#Time syncronization (applying milliseconds for time series processing)
print('Time syncronization:')
with progressbar.ProgressBar(max_value=len(individual.groupby('ID'))) as bar:
for baboon,df_baboon in individual.groupby('ID'):
times = list(df_baboon['TIME'].values)
d = Counter(times)
result = []
for timestamp in np.unique(times):
for i in range(0,d[timestamp]):
result.append(str(timestamp+i*1000/d[timestamp]))
individual.loc[individual['ID'] == baboon,'TIME'] = result
bar.update(1)
#Time series process
ts_process = time_series_processing(window, 'TIME_S', individual, 'COLLAR', ['COLLAR', 'TIME', 'X_AXIS','Y_AXIS','Z_AXIS'])
#Aggregation and tsfresh
ts_process.do_process()
individual = ts_process.get_processed_dataframe()
individual.to_csv('noise2/processed_data/'+str(window)+'/agg/'+str(day)+'.csv', index = False)
#NEtwork inference process
ni = network_inference_process(individual, 'TIME_S_mean')
#Inference
ni.do_process()
final = ni.get_processed_dataframe()
final.to_csv('noise2/processed_data/'+str(window)+'/net/'+str(day)+'.csv', index = False)
#Saving not aggregated ground truth
ground_truth = final[['ID_mean', 'TIME_S_mean', 'LABEL_O_values', 'LABEL_F_values']].copy()
#Neighbor features process
neighbors_features_f = ni.get_neighbor_features(final, 'TIME_S_mean', 'ID_mean')
neighbors_features_f = neighbors_features_f.drop(['LABEL_O_values_n', 'LABEL_F_values_n'], axis=1)
neighbors_features_f.to_csv('noise2/processed_data/'+str(window)+'/net/'+str(day)+'_neigh.csv', index = False)
# Final features dataframe
final_neigh = pd.merge(final, neighbors_features_f, how='left', left_on=['TIME_S_mean','ID_mean'], right_on = ['TIME_S_mean_n','BABOON_NODE_n'])
final_neigh.to_csv('noise2/processed_data/'+str(window)+'/complete/'+str(day)+'.csv', index = False)
return
如您所见,最后一条语句是写入文件,它由所有进程执行,我实际上并不认为问题出在这个函数内部。
解决方案
推荐阅读
- google-bigquery - 需要帮助查询热门帖子的热门 reddit 评论
- xml - 如何复制一个元素(文本字符串)的内容并附加到另一个元素的内容?(XML)
- python - 具有唯一约束的 INSERT 或 IGNORE 不起作用
- openseadragon - 最新的库版本显示图像边框
- qt - 无法在 QML 中设置文本字段以填充宽度
- javascript - 使用 AJAX 更新返回错误并直接进入路由显示“MethodNotAllowedHttpException No message”
- r - 重命名文件,使其包含日期
- angular - 我可以在 Ionic/Angular 的 HTML 模板中访问当前组件吗?
- node.js - Google Actions MediaResponse 回调在 iPhone Google Assistant 应用程序上不起作用,在模拟器和 Google Home Mini 上工作
- android - 我无法读取/写入 NFC 标签 (NfcV)