首页 > 解决方案 > 在 Python 中处理一个巨大的文件(>30GB)

问题描述

我需要处理一个包含数亿行的大约 30GB 的巨大文件。更准确地说,我想执行以下三个步骤:

  1. 分块读取文件:考虑到文件的大小,我没有内存一口气读取文件;

  2. 在将每个块聚合到更易于管理的大小之前计算块上的东西;

  3. 将聚合的块连接到包含我的分析结果的最终数据集中。

到目前为止,我已经编写了两个线程:

到目前为止,这是我的代码与虚拟数据的精神:

import queue
import threading
import concurrent.futures
import os
import random
import pandas as pd
import time

def process_chunk(df):
    return df.groupby(["Category"])["Value"].sum().reset_index(drop=False)

def producer(queue, event):
    print("Producer: Reading the file by chunks")
    reader = pd.read_table(full_path, sep=";", chunksize=10000, names=["Row","Category","Value"])
    for index, chunk in enumerate(reader):
        print(f"Producer: Adding chunk #{index} to the queue")
        queue.put((index, chunk))
        time.sleep(0.2)
    print("Producer: Finished putting chunks")
    event.set()
    print("Producer: Event set")

def consumer(queue, event, result_list):
    # The consumer stops iff queue is empty AND event is set
    # <=> The consumer keeps going iff queue is not empty OR event is not set
    while not queue.empty() or not event.is_set():
        try:
            index, chunk = queue.get(timeout=1)
        except queue.Empty:
            continue
        print(f"Consumer: Retrieved chunk #{index}")
        print(f"Consumer: Queue size {queue.qsize()}")
        result_list.append(process_chunk(chunk))
        time.sleep(0.1)
    print("Consumer: Finished retrieving chunks")

if __name__=="__main__":
    # Record the execution time
    start = time.perf_counter()

    # Generate a fake file in the current directory if necessary
    path = os.path.dirname(os.path.realpath(__file__))
    filename = "fake_file.txt"
    full_path = os.path.join(path, filename)
    if not os.path.exists(full_path):
        print("Main: Generate a dummy dataset")
        with open(full_path, "w", encoding="utf-8") as f:
            for i in range(100000):
                value = random.randint(1,101)
                category = i%2
                f.write(f"{i+1};{value};{category}\n")

    # Defining a queue that will store the chunks of the file read by the Producer
    queue = queue.Queue(maxsize=5)

    # Defining an event that will be set by the Producer when he is done
    event = threading.Event()

    # Defining a list storing the chunks processed by the Consumer
    result_list = list()

    # Launch the threads Producer and Consumer
    with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
        executor.submit(producer, queue, event)
        executor.submit(consumer, queue, event, result_list)

    # Display that the program is finished
    print("Main: Consumer & Producer have finished!")
    print(f"Main: Number of processed chunks = {len(result_list)}")
    print(f"Main: Execution time = {time.perf_counter()-start} seconds")

我知道步骤 1 的每次迭代都比步骤 2 的每次迭代花费更多时间,即消费者将始终等待生产者。

如何加快按块读取文件的过程(步骤 1)?

标签: pandasmultithreadingqueuebigdatachunks

解决方案


推荐阅读