首页 > 解决方案 > 在 Spark 连接之后使用 scala 对象

问题描述

用例

我的数据被写成数据框,我想检查两个具有完全相同架构的数据框是否相等。具体来说,要检查每个 id 值是否来自第一个和第二个数据帧的记录相同。换句话说,假设每个数据帧的每个 id 都有一条记录,我希望将每个 id 的差异放在数据帧一和数据帧二的行之间。

我的假设是我需要实现一个新的数据帧(即通过连接操作),以便使用 Spark 大规模执行此操作。到目前为止,我在这个假设中是对的吗?

到目前为止,这是该方面的代码:

  val postsFromDF1: Dataset[Post] = ... // dataframe read as a Dataset of Scala Objects
  val postsFromDF2: Dataset[Post] = ... // dataframe read as a Dataset of Scala Objects

  val joined: DataFrame = postsFromDF1.as("df1").join(postsFromDF2.as("df2"), usingColumn = "id")

现在我想列出那些值​​不相同的 id 匹配对象之间的所有差异(当然,它们加入的共享 id 文件除外)。因为其中一些值本身就是对象的集合——在我看来,使用 scala 对象的对象树可能比在此连接后切换到列名级别的工作更具可读性或直觉。评论到现在?这是使用 Spark 的好方法吗?

我的最后一个问题

如何为连接的每一行完成一个对象表示对(每个原始数据帧对象一个对象),同时在比较对象时仍然享受 Spark 的并行性?

像这样的对象表示:

case class PostPair(post: Post, otherPost: Post, id: String)

我试过的

我尝试锤击这个实验性代码,但它在运行时失败;可能Encoders.product隐含的描述性不够。

  case class PostPair(post: Post, otherPost: Post, id: String)
  implicit val encoder = Encoders.product[PostPair]

  val joined: Dataset[PostPair] =
    postsFromDF1.as("df1")
      .join(postsFromDF2.as("df2"), usingColumn = "id")
      .as[PostPair]

附加信息

以下是我如何从每个数据帧中分别完成案例类的集合:

case class PostsParquetReader(spark: SparkSession) {
  /** default method applied when the object is called */
  def apply(path: String) = {
    val df = spark.read.parquet(path)
    toCaseClass(spark, df)
  }

  /** applies the secret sauce for coercing to a case class that is implemented by spark's flatMap */
  private def toCaseClass(spark : SparkSession, idf : DataFrame)  = {
    import spark.implicits._
    idf.as[Post].flatMap(record => {
      Iterator[Post](record)
    })
  }
}

我觉得在 join 之后使用相同的对象强制方法可能会很麻烦,或者这种对象强制方法可能在 Spark 并行/分布式执行方面有其缺点。

另一方面,通过对象记录来比较和显示差异,就像数据是简单的 Scala 对象树一样工作(编码)似乎是最易读和最灵活的方法——因为它支持 Scala 集合 API 的标准利用。

标签: scaladataframeapache-sparkapache-spark-sql

解决方案


@Binzi 的解决方案可以工作并且需要一些改进。

@matanster您的方法很好,您可以使用DataSet API而不是DataFrame API。DataSet API 由 Scala 案例类支持,使复杂的操作更容易。与 DataFrame 相比,可扩展性相同,但性能略逊一筹。对于复杂的操作,我总是更喜欢 DataSet。

您可以直接从原始数据创建数据集,无需自己编写 toCaseClass(spark, df)。案例类架构必须与您的数据架构匹配。

    case class post ()//define all properties

    val spark : SparkSession = SparkSession.builder
      .appName("name")
      .master("local[2]")
      .getOrCreate()

    import spark.implicits._
    val postsFromDF1: Dataset[Post] = spark.read.parquet(path).as[post]
    val postsFromDF2: Dataset[Post] = spark.read.parquet(path).as[post]

    val joinedDs = postsFromDF1.joinWith(postsFromDF2)

joinDs 是一个元组(post,post)。然后您可以在此元组上应用逻辑并在其中发布对象(如您所想)。

表现

数据集需要将整个数据解码成对象才能进行操作。从本质上讲,它不能从列式存储中受益。但是在 Dataframe 中,您只能读取几列并对其进行操作,因为 parquet 是柱状的,因此可以节省大量时间来避免读取所有这些列。除此之外,我没有任何其他性能差异。可扩展性完全相同。


推荐阅读