首页 > 解决方案 > multiprocessing.Pool 未在可迭代的最后一个元素上运行

问题描述

我正在尝试运行一个函数,该函数func将索引列表作为参数并处理数据。

def func(rng):
    **some processing**
    write_csv_to_disk(processed_data[rng],mode="a")


import multiprocessing
pool = multiprocessing.Pool(4)
pool.map(func,list_of_lists_of_indices)
pool.close()

该函数将DataFrame[indices] 并行处理的部分保存到append模式中的文件中。list_of_lists_of_indices除最后一个列表外,该代码对于 的所有子列表都运行良好。最后一个列表中索引的数据未保存到我的文件中,并且池已关闭。

list_of_lists_of_indices = [[0,1,2,3,4,.....,99999],[100000,100001,100002,100003,100004,......,199999],.....,[10000000,10000001,...,100000895]]
import multiprocessing
pool = multiprocessing.Pool(4)
pool.map(func,iterable = list_of_lists_of_indices)
pool.close()

标签: pythonpython-multiprocessing

解决方案


好吧,您不是说做什么write_csv_to_disk,但是这里似乎存在一些可能的问题:

  1. 您有多个进程同时写入同一个文件,除非您采取特定步骤(例如锁定文件)以避免它们相互覆盖,否则这真的不能顺利进行
  2. 您正在解释的症状看起来很像您没有正确关闭文件对象,依靠垃圾收集器来执行此操作并关闭缓冲区,除非在最后一次迭代中,例如工作人员可能在 GC 运行之前死亡,因此文件没有关闭,它的缓冲区也没有刷新到磁盘
  3. 同样,虽然 a 的结果Pool.map有序的(代价高昂),但不能保证它们将以什么顺序执行。由于是工作人员在写入磁盘,因此没有理由订购这些。我什至不明白你为什么要使用mapmap 的全部意义在于返回计算结果,而你在这里没有这样做

    您不应该使用 Pool.map,也不应该“以附加模式保存到文件”。

另请注意,这Pool.close意味着您不会将新工作交给池,它不会等待工作人员完成。现在从理论上讲,如果您只使用同步方法,这应该无关紧要,但是在这种情况下并给定(2),这可能是一个问题:当父进程退出时,池可能会被垃圾收集,这意味着它会硬关闭泳池工人


推荐阅读