首页 > 解决方案 > Spark .load() 是否将所有数据放入 DF,然后执行 .select("fields")?

问题描述

我读到 Spark 只检索需要的数据,但我如何使用 Scala 进行检查?我正在使用 Scala 将数据从 ES 索引加载到 Spark DF。并且只需要选择需要的字段,如果我使用这个:

val indexData = sparkSession.read
    .format("es")
    .option("scroll.limit", 100000)
    .load(index)
    .select("country")

spark会加载记录的所有字段,然后选择“国家”还是先选择“国家”,然后才加载数据?

标签: scalaapache-sparkelasticsearch

解决方案


您可以检查“模式下推”(仅从源加载选择列)在物理查询计划中是否有效。

以这个简单的片段为例,并在您的本地机器上运行它:

import org.apache.spark.sql._

object App {

  def main(args: Array[String]): Unit = {
    val spark: SparkSession = SparkSession.builder().appName("test").master("local[*]").getOrCreate()
    import spark.implicits._

    Seq((1,"aa"),(2,"bb"),(3, "cc")).toDF("id", "value").write.mode("overwrite").parquet("tmp_data")
    val df = spark.read.parquet("tmp_data").select("id")
    df.explain
  }
}

输出应该类似于:

== Physical Plan ==
*(1) ColumnarToRow
+- FileScan parquet [id#13] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex[file:/home/gabriel/IdeaProjects/SparkTests/tmp_data], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<id:int>

根据ReadSchema: struct<id:int>您可以看到,只有该id列的数据是从源加载的。


推荐阅读