首页 > 解决方案 > 为什么 Spark DataFrame 转换为 RDD 需要完全重新映射?

问题描述

从 Spark 源代码:

/**
   * Represents the content of the Dataset as an `RDD` of `T`.
   *
   * @group basic
   * @since 1.6.0
   */
  lazy val rdd: RDD[T] = {
    val objectType = exprEnc.deserializer.dataType
    rddQueryExecution.toRdd.mapPartitions { rows =>
      rows.map(_.get(0, objectType).asInstanceOf[T])
    }
  }

https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala#L2972

可能需要与首先计算的mapPartitions时间一样长RDD。所以这使得操作如

df.rdd.getNumPartitions

非常贵。鉴于 a DataFrameisDataSet[Row]和 a DataSetis 由RDD's 组成,为什么需要重新映射?任何见解表示赞赏。

标签: scalaapache-spark

解决方案


TL;DR那是因为内部RDD不是RDD[Row].

假设一个 DataFrame 是DataSet[Row]并且 aDataSet是由 RDD 组成的

这是一个巨大的过度简化。首先DataSet[T] 并不意味着您与 .container 的容器进行交互T。这意味着如果您使用类似集合的 API(通常称为强类型),内部表示将被解码T.

内部表示是 Tungsten 内部使用的二进制格式。这种表示是内部的,可能会发生变化,而且级别太低,无法在实践中使用。

公开此数据的中间表示是InternalRow-rddQueryExecution.toRDD实际上是RDD[InternalRow]。这种表示(有不同的实现)仍然暴露了内部类型,被认为是“弱”私有的,因为其中的所有对象o.a.s.sql.catalyst(访问没有明确限制,但 API 没有记录),并且交互起来相当棘手。

这就是解码发挥作用的地方,以及为什么需要完整的“重新映射”——将内部的、通常不安全的对象转换为供公众使用的外部类型。

最后,重申我之前的陈述——有问题的代码在被调用时不会被执行getNumPartitions


推荐阅读