pandas - 在 Python 中处理一个巨大的文件(>30GB)
问题描述
我需要处理一个包含数亿行的大约 30GB 的巨大文件。更准确地说,我想执行以下三个步骤:
分块读取文件:考虑到文件的大小,我没有内存一口气读取文件;
在将每个块聚合到更易于管理的大小之前计算块上的东西;
将聚合的块连接到包含我的分析结果的最终数据集中。
到目前为止,我已经编写了两个线程:
- 一个线程负责按块读取文件并将这些块存储在队列中(步骤 1);
- 一个线程负责对块执行分析(步骤 2);
到目前为止,这是我的代码与虚拟数据的精神:
import queue
import threading
import concurrent.futures
import os
import random
import pandas as pd
import time
def process_chunk(df):
return df.groupby(["Category"])["Value"].sum().reset_index(drop=False)
def producer(queue, event):
print("Producer: Reading the file by chunks")
reader = pd.read_table(full_path, sep=";", chunksize=10000, names=["Row","Category","Value"])
for index, chunk in enumerate(reader):
print(f"Producer: Adding chunk #{index} to the queue")
queue.put((index, chunk))
time.sleep(0.2)
print("Producer: Finished putting chunks")
event.set()
print("Producer: Event set")
def consumer(queue, event, result_list):
# The consumer stops iff queue is empty AND event is set
# <=> The consumer keeps going iff queue is not empty OR event is not set
while not queue.empty() or not event.is_set():
try:
index, chunk = queue.get(timeout=1)
except queue.Empty:
continue
print(f"Consumer: Retrieved chunk #{index}")
print(f"Consumer: Queue size {queue.qsize()}")
result_list.append(process_chunk(chunk))
time.sleep(0.1)
print("Consumer: Finished retrieving chunks")
if __name__=="__main__":
# Record the execution time
start = time.perf_counter()
# Generate a fake file in the current directory if necessary
path = os.path.dirname(os.path.realpath(__file__))
filename = "fake_file.txt"
full_path = os.path.join(path, filename)
if not os.path.exists(full_path):
print("Main: Generate a dummy dataset")
with open(full_path, "w", encoding="utf-8") as f:
for i in range(100000):
value = random.randint(1,101)
category = i%2
f.write(f"{i+1};{value};{category}\n")
# Defining a queue that will store the chunks of the file read by the Producer
queue = queue.Queue(maxsize=5)
# Defining an event that will be set by the Producer when he is done
event = threading.Event()
# Defining a list storing the chunks processed by the Consumer
result_list = list()
# Launch the threads Producer and Consumer
with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
executor.submit(producer, queue, event)
executor.submit(consumer, queue, event, result_list)
# Display that the program is finished
print("Main: Consumer & Producer have finished!")
print(f"Main: Number of processed chunks = {len(result_list)}")
print(f"Main: Execution time = {time.perf_counter()-start} seconds")
我知道步骤 1 的每次迭代都比步骤 2 的每次迭代花费更多时间,即消费者将始终等待生产者。
如何加快按块读取文件的过程(步骤 1)?
解决方案
推荐阅读
- git - 如何从本地 windows 存储库推送到 windows git 远程服务器
- python - 如何使用类装饰器来实现单例模式?
- r - 根据另一列中的初始字符串添加列值
- nginx - nginx 在 MQTTS 上进行负载平衡
- python - Django如何创建仅对某些类型的用户可见的按钮
- capl - CAPL 代码从按钮发送信号值
- javascript - 即使添加了 async/await,代码也不会异步执行
- android - 如何在 Android 10 中以编程方式连接 Wi-Fi
- linux - 表示 SNMP 中的整数 26
- c# - 用户掷两个骰子,应用程序会根据他们在两个骰子上获得相同数字的速度来告诉他们的运气