首页 > 解决方案 > 如何在 Spark 中对 DataSet 的窗口进行二次采样?

问题描述

假设我有一个DataSet看起来像这样的:

Name    | Grade
---------------
Josh    | 94
Josh    | 87
Amanda  | 96
Karen   | 78
Amanda  | 90
Josh    | 88

我想创建一个新的DataSet,其中每个名称有 3 行,其中额外的行(如果有)是从同名的行中采样的(例如,Karen 将有三个相同的行)。

如何在不遍历每个名​​称的情况下做到这一点?

标签: javaapache-sparksubsampling

解决方案


资料准备:

 val df = Seq(("Josh",94),("Josh",87),("Amanda",96),("Karen",78),("Amanda",90),("Josh",88)).toDF("Name","Grade")

仅当您的数据skewed用于 a时才执行以下Name操作:添加一个随机数,并为每个 过滤前 3 个随机数Name

val df2 = df.withColumn("random", round(rand()*10))

import org.apache.spark.sql.expressions.Window
val windowSpec  = Window.partitionBy("Name").orderBy("random")

val df3 = df2.withColumn("row_number",row_number.over(windowSpec))
             .filter($"row_number" <= 3)

现在,汇总每个 的值Name并重复 3 次,以确保每个 至少有 3 条记录Name。然后最后取第一个 3 个值,然后explode

df4.groupBy("Name").agg(collect_list("Grade") as "grade_list")
.withColumn("temp_list", slice( flatten(array_repeat($"grade_list", 3)), 1,3))
.select($"Name",explode($"temp_list") as "Grade").show

备注:

  • 由于上面的代码在 中最多有 3 个值grade_list,因此复制 3 次不会造成伤害。
  • 如果您不使用该Window步骤,则可以组合使用when( size($"grade_list") === n, ).otherwise()上述不必要的重复。

推荐阅读