首页 > 解决方案 > 使用 Python 处理不适合内存的文件

问题描述

我有一个包含数百万 XML 文件的大 tar 文件(总共 700GB)。这些 XML 文件有很多垃圾数据,我尝试解析它们,获取我需要的详细信息并将它们存储在 CSV 中。

我的第一步是将 tar 文件拆分为更小的(每个约 1-1.5GB)文件。现在,我需要浏览所有 tar 文件,阅读它们,获取信息并将其存储在 2 个不同的 CSV 文件中。

我的代码:

import tarfile
import csv  
import glob 
from multiprocessing import Process
import xml.etree.ElementTree as ET

def main(index, tar_file):

    tar = tarfile.open(tar_file)

    file1 = open('file1_' + str(index) + '.csv', "w")
    file2 = open('file2_' + str(index) + '.csv', "w")

    writer1 = csv.writer(file1, delimiter=',')
    writer2 = csv.writer(file2, delimiter=',')

    for member in tar:
        if member.isreg() and member.name.endswith('.xml'): # regular xml file
            with closing(tar.extractfile(member)) as xmlfile:
                root = ET.parse(xmlfile).getroot()
                if <statement>:
                    #get the data I want from root
                    writer1.writerow(<some data>)

                if <statement>:   
                    #get the data I want from root      
                    writer2.writerow(<some data>)
    workFile.close()
    peerFile.close()  
    tar.close()               

if __name__ == '__main__':

    files = [f for f in glob.glob("data/*.tar", recursive=True)]  
    procs = []
    for index, f in enumerate(files):
        proc = Process(target=main, args=(index, f,))
        procs.append(proc)
        proc.start()

    for proc in procs:
        proc.join()

我是这样做的,所以我不在内存中保留任何内容并逐行写入文件。但是,在运行上述代码一段时间后,我的笔记本电脑刚刚关闭。我想,代码中有一部分会填满内存。我如何处理这种情况而无需立即阅读所有内容?

标签: pythonmultiprocessingbigdata

解决方案


目前还不清楚为什么您的笔记本电脑会关闭。这可能是“内存不足”和“文件描述符不足”的错误组合(您产生了很多进程,每个进程打开 3 个文件,是吗?)并且可能是您的操作系统中的错误或某些硬件故障。

无论哪种方式,您都可以尝试通过减少衍生进程的数量来避免它。首先,为每个文件生成一个进程没有任何好处。经验法则是:永远不要产生超过,比如说 [3 x 核心数] 并行函数(当你执行纯粹的 CPU 密集型任务时,通常只有 [核心数] 就足够了,但你确实有少量 i/o以及)。

所以而不是

files = [f for f in glob.glob("data/*.tar", recursive=True)]  
procs = []
for index, f in enumerate(files):
    proc = Process(target=main, args=(index, f,))
    procs.append(proc)
    proc.start()

for proc in procs:
    proc.join()

尝试这个

from multiprocessing import Pool, cpu_count
pool = Pool(2*cpu_count())  # or 3, do some empirical testing
files = [f for f in glob.glob("data/*.tar", recursive=True)]  
procs = []
for index, f in enumerate(files):
    pool.apply_async(main, (index, f,))

pool.close()
pool.join()

在此处阅读有关池的更多信息:https ://docs.python.org/2/library/multiprocessing.html#using-a-pool-of-workers

如果您使用的是 Python3.x,您也可以尝试执行器:https ://docs.python.org/3/library/concurrent.futures.html


推荐阅读