首页 > 解决方案 > Spark数据框:派生列的连接方法

问题描述

给定如下代码 ( df) 中的数据集,我的要求是能够添加派生列 ( DerivedCol)。该列的值对于一idcol组行是恒定的,并且是通过应用 a) 对另一列的值(filter此处)的谓词,然后 b)max在匹配组上的聚合函数(此处使用)得出的。

val df = Seq(("id1","k1","7"),("id2","k1","5"),("id1","k3","2"),("id3","k1","4"),("id2","k5","1"),("id4","k5","1"))
  .toDF("idcol","keycol","valcol")

val aggDf = df.filter($"keycol" === "k1")
  .select($"idcol",$"valcol")
  .groupBy($"idcol")
  .agg(max($"valcol".cast(IntegerType)).cast(StringType).as("DerivedCol"))
  .withColumnRenamed("idcol", "newidcol")

df.join(aggDf, df("idcol") === aggDf("newidcol"), "left_outer")
  .drop(aggDf("newidcol"))

我正在left outer join为此使用。我的数据集非常庞大(数百万行)。我有以下问题:

  1. 有没有其他方法可以实现这一目标?
  2. 我应该使用什么分区逻辑来减少洗牌?

列的基数idcol非常高。Spark 版本是 2.1.1。

标签: scalaapache-sparkapache-spark-sql

解决方案


有没有其他方法可以实现这一目标?

有 - 窗口功能。

import org.apache.spark.sql.functions.max
import org.apache.spark.sql.expressions.Window

df.withColumn(
   "derivedcol",  
   max($"valcol".cast(IntegerType)).over(Window.partitionBy($"idcol")
)

根据:

  • 基数 - 高基数是好的。
  • 组的大小分布——没有大的正偏差的小组是好的。

这可能比聚合后加入的表现更好或更差。

我应该使用什么分区逻辑来减少洗牌?

可能没有。至少有两个原因:

  • 如果你有大量的小组窗口函数就可以了,不需要额外的分区。
  • 如果您有少量较大的组数据,则应广播数据,并且唯一需要的随机播放是聚合。
  • 如果有大量的大组 - 您可能会考虑按 id 进行预分区,但根据因素的数量,您可以在松散和增益的情况下,并且平均而言没有额外的 shuffle(分区)更好。

推荐阅读