首页 > 解决方案 > 如何在 numpy/pandas 中处理来自 spark 的大型镶木地板文件?

问题描述

我为 pandas、numpy 和 spark 标签发布了这个,因为我不确定在这三个系统中解决这个问题的最佳方法。

我有一个大型镶木地板文件,下游进程无法打开,因为它超出了系统的内存(如果一次打开,内存中约 63gb)。我是这样写文件的:

FULL_MAIN.write.mode("overwrite").parquet(PATH+"/FULL_MAIN.parquet")

但是文件太大了,所以我尝试这样做以将文件分成更小的夹头:

    split_factor = [.1,.1,.1,.1,.1,.1,.1,.1,.1,.1]
    FULL_MAIN_RDD1,FULL_MAIN_RDD2,FULL_MAIN_RDD3,FULL_MAIN_RDD4,FULL_MAIN_RDD5, FULL_MAIN_RDD6,FULL_MAIN_RDD7,FULL_MAIN_RDD8,FULL_MAIN_RDD9,FULL_MAIN_RDD10  = FULL_MAIN.randomSplit(split_factor)
FULL_MAIN_RDD1.write.mode("overwrite").parquet(PATH+"/FULL_MAIN_RDD1.parquet")
FULL_MAIN_RDD2.write.mode("overwrite").parquet(PATH+"/FULL_MAIN_RDD2.parquet")
...

这种方法的问题是我需要保持行对齐的其他数据帧,并且进行这种随机拆分会使数据帧不对齐。

所以我的两个问题是:

  1. 当我的数据集中的每一行没有任何行号或数字计数器时,有没有办法以相对相等的数量拆分多个数据帧?
  2. 有没有办法在 pandas 或 numpy 中批量读取镶木地板文件?这基本上可以解决我在下游系统上的问题。我不知道如何分批打开镶木地板(我试图在 pandas 中打开它,然后拆分行并保存每个文件,但是当我加载数据框时,它会使我的系统崩溃)。我不确定在不超过内存的情况下是否有可能。

标签: pandasnumpyapache-sparkpysparkparquet

解决方案


Parquet 文件格式支持行组。创建parquet文件时安装pyarrow使用:row_groups

df.to_parquet("filename.parquet", row_group_size=10000, engine="pyarrow")

然后您可以逐组阅读(甚至只阅读特定组):

import pyarrow.parquet as pq

pq_file = pq.ParquetFile("filename.parquet")
n_groups = pq_file.num_row_groups
for grp_idx in range(n_groups):
    df = pq_file.read_row_group(grp_idx, use_pandas_metadata=True).to_pandas()
    process(df)

如果您无法控制 parquet 文件的创建,您仍然只能读取文件的一部分:

pq_file = pq.ParquetFile("filename.parquet")
batch_size = 10000 # records

batches = pq_file.iter_batches(batch_size, use_pandas_metadata=True) # batches will be a generator    
for batch in batches:
    df = batch.to_pandas()
    process(df)

推荐阅读