首页 > 解决方案 > 在 for 循环中使用 iter 时 Python lxml 内存不足

问题描述

我正在编写一个解决方案以从文件中提取信息。这些文件是通过其他一些脚本在 Windows 事件实用程序命令中生成的(我没有调用,只是接收文件进行解析):

wevtutil qe Application /q:"*[System[Provider[@Name='NameOfTheSourceApplication']]]" >> %FILE%

此命令将有关查询的源应用程序的所有输出保存到转储文件中,最终每行上的每个事件都有一个 XML。我只关心EventDataand TimeCreated SystemTime

示例输出:

<?xml version="1.0" encoding="UTF-8"?>
<Event xmlns="http://schemas.microsoft.com/win/2004/08/events/event">
   <System>
      <Provider Name="" />
      <EventID Qualifiers="">0</EventID>
      <Level>4</Level>
      <Task>0</Task>
      <Keywords />
      <TimeCreated SystemTime="2018-10-02T11:19:41.000000000Z" />
      <EventRecordID />
      <Channel>Application</Channel>
      <Computer />
      <Security />
   </System>
   <EventData>
      DATA
      <Data />
   </EventData>
</Event>

文件转储完成后,文件可能会很大(超过 6-7GB)。所以我正在使用 Linuxiconv实用程序将源文件编码从UTF-16/UCS2-LE(wevutil 的默认编码) 更改为UTF-8,新的编码几乎减少了文件大小的一半。然后我使用grouper配方结合一些简单的文件拆分功能,以便将大转储文件拆分为较小的文件:

def grouper(n, iterable, fillvalue=None):
   """Collect data into fixed-length chunks or blocks"""
   # grouper(3, 'ABCDEFG', 'x') --> ABC DEF Gxx
   args = [iter(iterable)] * n
   return zlg(fillvalue=fillvalue, *args)

def splitter(fileobj,outName,ranoutName,rencode,wencode):
    with open(fileobj,"r",encoding='UTF-8',errors='replace') as f:
        for i, g in enumerate(grouper(n, f, fillvalue=''), 1):
            with open('{0}_{1}.xml'.format(i,outName), 'w',encoding=wencode) as fout:
                fout.writelines(g)
                print("Splitting file : %s" % (fileobj))

由于这些文件实际上不是 XML 文件,而是每一行都格式化为带有命名空间的 xml,因此我将一个根标记一一添加到每个拆分的文件中,以便稍后解析lxml(glst 代表“全局列表”)。

def rooter(glst):
    for logFiles in glst:
        oFile = open(logFiles,'r',encoding='utf-8')
        rFile = oFile.read()
        wFile = open(logFiles,'w',encoding='utf-8')
        wFile.write('<root>')
        wFile.write(rFile)
        wFile.write('</root>')
        oFile.close()
        wFile.close()

        print("Rooting XML : %s" % (logFiles))

然后我只加载一个要解析的 XML 文件lxml

def loadXml(fileobj):
    tree = etree.parse(fileobj)
    print("Processing : %s" % (fileobj))
    return tree

这是我的瓶颈,因为我没有找到任何其他方便的方法来有效地解析文件,而我只寻找Event Data和我的Event Time. 找到数据后,我将我的发现附加到两个单独的列表(一个用于事件数据,一个用于事件时间),稍后我将其转换为简单的 CSV 文件,以便通过pandas.

此代码实际上适用于 2GB 以下的文件,但在解析 2GB 以上的文件时完全耗尽内存,我的解决方案必须在只有 2-3GB 可用 RAM 的系统(Windows 64 位桌面)中运行。

def parser(tree,DataL,DataTimeL):
    for evts in tree.iter('{%s}EventData' % nameSpace):
        EvtData = evts.find('{%s}Data' % nameSpace).text
        DataL.append(EvtData)        
    for evtSysTime in tree.iter('{%s}System' % nameSpace):
        eSysTime = evtSysTime.find('{%s}TimeCreated' % nameSpace).attrib
        DataTimeL.append(eSysTime)
        break

我在解析后手动尝试gc.collectdel文件对象,但它似乎没有任何效果,python 一直在建立内存,直到 PC 崩溃。

def initParser(glst,DataL,DataTimeL):
    for file in glst:
     root = loadXml(file)
     parser(root,DataL,DataTimeL)
     gc.collect()
     del file

CSV 创建(zlg 代表 itertools - zip_longest):

with open('LogOUT.csv', 'w', encoding="UTF-8", newline='') as cF:
    wr = csv.writer(cF)
    wr.writerow(("Event", "Event Time"))
    wr.writerows(zlg(EvtL,EvtTimeL))

我尝试过使用 TinyDB、ZODB,这听起来有点矫枉过正,而且速度太慢或者我做错了。手动将事件转储到 CSV 非常慢。由于for循环解析器功能实际上对 2GB 以下的文件非常有效,因此我想找到一种安全有效地附加这些大列表而不会使整个系统崩溃的方法。

提前致谢。

标签: pythonxmlmemorylxml

解决方案


这是一个概念证明,它使用迭代器读取一个大文件,该文件由具有自包含 XML 的行组成,并将特定字段提取到 CSV 文件中。修改它以满足您的需要。

import csv
import itertools
import typing
from io import StringIO
from xml.etree import ElementTree
from xml.etree.ElementTree import Element


def grouper(iterable, n, fill=None) -> typing.Iterator:
    args = [iter(iterable)] * n
    return itertools.zip_longest(*args, fillvalue=fill)


def parse_event_xml(event_xml: str) -> dict:
    root: Element = ElementTree.fromstring(event_xml)
    namespaces = {'ns': 'http://schemas.microsoft.com/win/2004/08/events/event'}
    time_el = root.find('./ns:System/ns:TimeCreated', namespaces=namespaces)
    data_el = root.find('./ns:EventData', namespaces=namespaces)

    return {
        'Event Time': time_el.attrib['SystemTime'],
        'Event Data': data_el.text,
    }


def process_batch(batch: typing.Iterator[str], batch_filename: str) -> None:
    fields = ['Event Time', 'Event Data']
    with open(batch_filename, 'w', newline='') as bf:
        writer = csv.DictWriter(bf, fieldnames=fields)
        writer.writeheader()
        for item in batch:
            if not item:  # skip empty lines
                continue
            parsed = parse_event_xml(item)
            writer.writerow(parsed)


if __name__ == '__main__':
    xml_raw = '''<Event xmlns='http://schemas.microsoft.com/win/2004/08/events/event'><System><Provider Name=''/><EventID Qualifiers=''>0</EventID><Level>4</Level><Task>0</Task><Keywords></Keywords><TimeCreated SystemTime='2018-10-02T11:19:41.000000000Z'/><EventRecordID></EventRecordID><Channel>Application</Channel><Computer></Computer><Security/></System><EventData>DATA<Data></Data></EventData></Event>
    <Event xmlns='http://schemas.microsoft.com/win/2004/08/events/event'><System><Provider Name=''/><EventID Qualifiers=''>0</EventID><Level>4</Level><Task>0</Task><Keywords></Keywords><TimeCreated SystemTime='2018-10-02T11:19:41.000000000Z'/><EventRecordID></EventRecordID><Channel>Application</Channel><Computer></Computer><Security/></System><EventData>DATA<Data></Data></EventData></Event>'''

    batch_size = 10  # lines / events

    # read the event stream
    # normally you'd use `with open(filename, encoding='utf-8')`
    # but here i'm reading from a string
    with StringIO(xml_raw) as f:
        for i, batch in enumerate(grouper(f, batch_size)):
            batch_filename = f'batch_{i}.csv'
            process_batch(batch, batch_filename)

调整parse_event_xml函数来提取你需要的数据,这里我只用过EventTimeEventData

这个输出一个像这样的csv文件:

Event Time,Event Data
2018-10-02T11:19:41.000000000Z,DATA
2018-10-02T11:19:41.000000000Z,DATA

推荐阅读