python - multiprocessing.Pool 未在可迭代的最后一个元素上运行
问题描述
我正在尝试运行一个函数,该函数func
将索引列表作为参数并处理数据。
def func(rng):
**some processing**
write_csv_to_disk(processed_data[rng],mode="a")
import multiprocessing
pool = multiprocessing.Pool(4)
pool.map(func,list_of_lists_of_indices)
pool.close()
该函数将DataFrame[indices]
并行处理的部分保存到append
模式中的文件中。list_of_lists_of_indices
除最后一个列表外,该代码对于 的所有子列表都运行良好。最后一个列表中索引的数据未保存到我的文件中,并且池已关闭。
list_of_lists_of_indices = [[0,1,2,3,4,.....,99999],[100000,100001,100002,100003,100004,......,199999],.....,[10000000,10000001,...,100000895]]
import multiprocessing
pool = multiprocessing.Pool(4)
pool.map(func,iterable = list_of_lists_of_indices)
pool.close()
解决方案
好吧,您不是说做什么write_csv_to_disk
,但是这里似乎存在一些可能的问题:
- 您有多个进程同时写入同一个文件,除非您采取特定步骤(例如锁定文件)以避免它们相互覆盖,否则这真的不能顺利进行
- 您正在解释的症状看起来很像您没有正确关闭文件对象,依靠垃圾收集器来执行此操作并关闭缓冲区,除非在最后一次迭代中,例如工作人员可能在 GC 运行之前死亡,因此文件没有关闭,它的缓冲区也没有刷新到磁盘
同样,虽然 a 的结果是
Pool.map
有序的(代价高昂),但不能保证它们将以什么顺序执行。由于是工作人员在写入磁盘,因此没有理由订购这些。我什至不明白你为什么要使用map
map 的全部意义在于返回计算结果,而你在这里没有这样做您不应该使用 Pool.map,也不应该“以附加模式保存到文件”。
另请注意,这Pool.close
意味着您不会将新工作交给池,它不会等待工作人员完成。现在从理论上讲,如果您只使用同步方法,这应该无关紧要,但是在这种情况下并给定(2),这可能是一个问题:当父进程退出时,池可能会被垃圾收集,这意味着它会硬关闭泳池工人。
推荐阅读
- mongodb - 如何查询多个MongoDB集合?
- scroll - 同步滚动多个可滚动小部件
- python - input() 将一个答案与另一个答案联系起来
- php - 用php代码解决税收问题的数学公式
- javascript - 如何检查对象是一个集合?
- java - JAVA:如何将多个成员添加到 LDAP 中的组
- javascript - 我们是否*必须*使用 data: URIs (readAsDataURL)在 JavaScript中使用 File 对象?
- android - 当我从 navigation.xml 转换到其他片段时,RecyclerView 是空白的
- elasticsearch - 如何计算两个事件的发生,然后显示提升的百分比并在 kibana 中获取指标?
- android - 如何在不使用操作栏且没有任何物理按钮的情况下退出应用程序?