scala - Spark数据框:派生列的连接方法
问题描述
给定如下代码 ( df
) 中的数据集,我的要求是能够添加派生列 ( DerivedCol
)。该列的值对于一idcol
组行是恒定的,并且是通过应用 a) 对另一列的值(filter
此处)的谓词,然后 b)max
在匹配组上的聚合函数(此处使用)得出的。
val df = Seq(("id1","k1","7"),("id2","k1","5"),("id1","k3","2"),("id3","k1","4"),("id2","k5","1"),("id4","k5","1"))
.toDF("idcol","keycol","valcol")
val aggDf = df.filter($"keycol" === "k1")
.select($"idcol",$"valcol")
.groupBy($"idcol")
.agg(max($"valcol".cast(IntegerType)).cast(StringType).as("DerivedCol"))
.withColumnRenamed("idcol", "newidcol")
df.join(aggDf, df("idcol") === aggDf("newidcol"), "left_outer")
.drop(aggDf("newidcol"))
我正在left outer join
为此使用。我的数据集非常庞大(数百万行)。我有以下问题:
- 有没有其他方法可以实现这一目标?
- 我应该使用什么分区逻辑来减少洗牌?
列的基数idcol
非常高。Spark 版本是 2.1.1。
解决方案
有没有其他方法可以实现这一目标?
有 - 窗口功能。
import org.apache.spark.sql.functions.max
import org.apache.spark.sql.expressions.Window
df.withColumn(
"derivedcol",
max($"valcol".cast(IntegerType)).over(Window.partitionBy($"idcol")
)
根据:
- 基数 - 高基数是好的。
- 组的大小分布——没有大的正偏差的小组是好的。
这可能比聚合后加入的表现更好或更差。
我应该使用什么分区逻辑来减少洗牌?
可能没有。至少有两个原因:
- 如果你有大量的小组窗口函数就可以了,不需要额外的分区。
- 如果您有少量较大的组数据,则应广播数据,并且唯一需要的随机播放是聚合。
- 如果有大量的大组 - 您可能会考虑按 id 进行预分区,但根据因素的数量,您可以在松散和增益的情况下,并且平均而言没有额外的 shuffle(分区)更好。
推荐阅读
- java - Java Mission Control 是否包含在 JDK 中?
- excel - 每小时在 14、17、44、47 分钟运行第一个宏,在 22.52 分钟运行第二个宏
- c - 错误:整数寄存器的大小不受支持
- python - 我是否应该始终传递用户密码进行加密?
- ruby-on-rails - Rails渲染:json:为什么它返回带有纯文本密码的配置->数据?
- r - 在R中使用哪种类型的连接,最好是tidyverse
- java - 如何在 CameraX 预览上设置一个框,以便使用 Java 中的 ImageAnalysis 对其进行处理?
- c++ - 容器的 begin() 和 end() 的不同实现
- scala - ConcurrentHashMap[String, AtomicInteger] 或 ConcurrentHashMap[String, Int] 用于线程安全计数器?
- c++ - Zobrist 转置表中的碰撞