首页 > 解决方案 > 高效地将大熊猫数据写入不同的文件

问题描述

我有一个大约 200 万行(每行 80 列)的 pandas 数据框。

我想将数据框输出到 csv 以及镶木地板文件。

假设数据框存在于df变量中

初步方法

print('Creating csv and parquet files')
st = time.time()
df.to_csv('output_file.csv')
df.to_parquet('output_file.parquet')
print(f'Created csv and parquet files in {time.time() - st} seconds')

使用这种方法写入文件需要很长时间。我假设由于这两个是独立的操作,我可以利用多个进程。

较新的方法

def build_csv(dataframe, output_filename):
    print(f'Building csv: {output_filename}')
    dataframe.to_csv(output_filename)


def build_parquet(dataframe, output_filename):
    print(f'Building parquet: {output_filename}')
    dataframe.to_parquet(output_filename)


with ProcessPoolExecutor(max_workers=3) as executor:
    executor.submit(build_csv, (df, 'output_file.csv'))
    executor.submit(build_parquet, (df, 'output_file.parquet'))

较新的方法运行成功,但我没有看到正在创建任何文件。我不确定为什么会这样。

是否有更好(更快)的方法将 pandas 数据帧写入不同的文件?

标签: pythonpandaspython-multiprocessingparquet

解决方案


编辑:我保留了下面的线程解决方案供您参考。但是,此解决方案应该解决 Python GIL 问题。我已经测试过了,可以看到文件已经写入成功:

from multiprocessing import Pool
import pandas as pd

# original data:
data = pd.DataFrame([
    [ 1, 2, 3, 4,], 
    [ 1, 2, 3, 4,], 
    [ 1, 2, 3, 4,], 
    [ 1, 2, 3, 4,], 
    [ 1, 2, 3, 4,],
])    


def SaveDataToCsv(data):
    print('Started export to .csv')
    data.to_csv('data.csv')
    print('Finished export to .csv')


def SaveDataToParquet(data):
    print('Started export to .parquet')
    data.to_parquet('data.parquet')
    print('Finished export to .parquet')


# multiprocessing method:
pool = Pool(processes=2)
process1 = pool.apply_async(SaveDataToCsv, [data])
process2 = pool.apply_async(SaveDataToParquet, [data])

测试了threading图书馆,它似乎工作正常:

import pandas as pd
import threading

# original data:
data = pd.DataFrame([
    [ 1, 2, 3, 4,],
    [ 1, 2, 3, 4,],
    [ 1, 2, 3, 4,],
    [ 1, 2, 3, 4,],
    [ 1, 2, 3, 4,],
])


def SaveDataToCsv(data):        
    data.to_csv('data.csv')


def SaveDataToParquet(data):
    data.to_parquet('data.parquet')    

thread1 = threading.Thread(target=SaveDataToCsv, args=(data,))
thread2 = threading.Thread(target=SaveDataToParquet, args=(data,))

thread1.start()
thread2.start()

推荐阅读