首页 > 解决方案 > 即使列不在数据框中,Spark 也会下推过滤器

问题描述

我有一个DataFrame列:

field1, field1_name, field3, field5, field4, field2, field6

我正在选择它,以便我只保留field1, field2, field3, field4. 注意field5select后面没有。

之后,我有一个使用的过滤器,field5我希望它会抛出一个分析错误,因为该列不存在,而是它正在过滤原始DataFrame(在选择之前),因为它正在按下过滤器,如下所示:

== Parsed Logical Plan ==
'Filter ('field5 = 22)
+- Project [field1#43, field2#48, field3#45, field4#47]
+- Relation[field1#43,field1_name#44,field3#45,field5#46,field4#47,field2#48,field6#49] csv

== Analyzed Logical Plan ==
field1: string, field2: string, field3: string, field4: string
Project [field1#43, field2#48, field3#45, field4#47]
+- Filter (field5#46 = 22)
+- Project [field1#43, field2#48, field3#45, field4#47, field5#46]
+- Relation[field1#43,field1_name#44,field3#45,field5#46,field4#47,field2#48,field6#49] csv

== Optimized Logical Plan ==
Project [field1#43, field2#48, field3#45, field4#47]
+- Filter (isnotnull(field5#46) && (field5#46 = 22))
+- Relation[field1#43,field1_name#44,field3#45,field5#46,field4#47,field2#48,field6#49] csv

== Physical Plan ==
  *Project [field1#43, field2#48, field3#45, field4#47]
+- *Filter (isnotnull(field5#46) && (field5#46 = 22))
+- *FileScan csv [field1#43,field3#45,field5#46,field4#47,field2#48] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/Users/..., PartitionFilters: [], PushedFilters: [IsNotNull(field5), EqualTo(field5,22)], ReadSchema: struct<field1:string,field3:string,field5:string,field4:stri...

如您所见,物理计划在项目之前有过滤器......这是预期的行为吗?我会期待一个分析异常而不是......

该问题的可重现示例:

val df = Seq(
      ("", "", "")
    ).toDF("field1", "field2", "field3")

    val selected = df.select("field1", "field2")
    val shouldFail = selected.filter("field3 == 'dummy'") // I was expecting this filter to fail
    shouldFail.show()

输出:

+------+------+
|field1|field2|
+------+------+
+------+------+

标签: scalaapache-spark

解决方案


Dataset/Dataframe上的文档描述了您观察得很好的原因:

“数据集是‘惰性的’,即仅在调用操作时触发计算。在内部,数据集表示描述生成数据所需计算的逻辑计划。当调用操作时,Spark 的查询优化器会优化逻辑计划并生成物理计划,以并行和分布式方式高效执行。”

重要部分以粗体突出显示。当应用selectfilter语句时,它只是被添加到一个逻辑计划中,只有在应用一个动作时才会被 Spark 解析。在解析这个完整的逻辑计划时,Catalyst Optimizer 会查看整个计划,其中一个优化规则是下推过滤器,这就是您在示例中看到的。

我认为这是一个很棒的功能。即使您对在最终 Dataframe 中看到此特定字段不感兴趣,它也知道您对某些原始数据不感兴趣。

这是 Spark SQL 引擎相对于 RDD 的主要优势。它了解正在尝试做什么,而不会被告知如何去做。


推荐阅读