首页 > 解决方案 > 当有过滤器和效率建议时,Spark 如何处理 parquet 文件

问题描述

假设您有非常大的 parquet 文件,您想要从中过滤一个子集并保存它:

val df = spark.read.parquet(inputFileS3Path)
    .select(c1, c2, c3)
    .where("c1 = '38940f'")
df.write.parquet(outputFileS3Path)

Spark 会先在内存中读取所有 parquet 文件,然后再进行过滤吗?有没有一种方法,例如,Spark 只读取一个批次并只将满足过滤条件的记录保存在内存中?

我在 Zeppelin 笔记本中运行 Spark 2.2,似乎正在发生的事情是它在内存中读取所有内容,然后进行过滤,导致进程有时崩溃(在 Spark Web UI 中,阶段中的输入就像 > 1TB 但S3 中的输出为 1 MB)。

是否有更有效的方法来过滤这些文件(是否更改代码、文件格式、Spark 版本等)?我已经只选择了列的一个子集,但这似乎还不够。

更新

经过进一步调查,我注意到 Spark 正在读取所有内容,以防过滤器位于嵌套字段上:

val df = spark.read.parquet(inputFileS3Path)
    .select(c1, c2, c3)
    .where("c1.a = '38940f'")
df.write.parquet(outputFileS3Path)

而且我认为该功能仍未实现(请参阅https://issues.apache.org/jira/browse/SPARK-17636)。除了用嵌套字段显式重写所有镶木地板之外,您还有什么技巧吗?有没有办法强制优化器制定更好的计划?

标签: apache-sparkparquet

解决方案


Spark 支持顶级字段的谓词下推。

使用df.explain()方法检查查询计划。您应该会看到如下内容:

   +- FileScan parquet [c1#413,c2#414,c3#415] Batched: false, 
      Format: Parquet, Location: TahoeLogFileIndex[file:/inputFileS3Path], PartitionCount: 4320, PartitionFilters: [], 
      PushedFilters: [IsNotNull(c1), EqualTo(c1,38940f)], 
      ReadSchema: struct<c1:string,c2:string,c3:string>

重要的部分是检查谓词下推的PushedFilters和检查模式修剪的ReadSchema 。

正如您在问题中提到的,不支持嵌套字段的谓词下推。但是,您可以通过模式修剪优化嵌套字段的查询性能。

检查我在这个线程上的答案。


推荐阅读