首页 > 解决方案 > 如何在 Spark Dataframe 上有效地执行此列操作?

问题描述

我有一个数据框如下:

+---+---+---+
| F1| F2| F3|
+---+---+---+
|  x|  y|  1|
|  x|  z|  2|
|  x|  a|  4|
|  x|  a|  4|
|  x|  y|  1|
|  t| y2|  6|
|  t| y3|  4|
|  t| y4|  5|
+---+---+---+

我想添加另一列,其值为 ( number of unique rows of "F1" and "F2" for each unique "F3" / total number of unique rows of "F1" and "F2")。

例如,对于上表,下面是所需的新数据框:

+---+---+---+----+
| F1| F2| F3|  F4|
+---+---+---+----+
|  t| y4|  5| 1/6|
|  x|  y|  1| 1/6|
|  x|  y|  1| 1/6|
|  x|  z|  2| 1/6|
|  t| y2|  6| 1/6|
|  t| y3|  4| 2/6|
|  x|  a|  4| 2/6|
|  x|  a|  4| 2/6|
+---+---+---+----+

注意:在 的情况下F3 = 4,只有2唯一的F1F2= {(t, y3), (x, a)}。因此,对于所有出现的F3 = 4F4将是2/(total number of unique ordered pairs of F1 and F2. Here there are 6 such pairs)

如何在 Spark Scala 中实现上述转换?

标签: scalaapache-sparkapache-spark-sql

解决方案


我刚刚学会了尝试解决您的问题,即您在执行 Window over DataFrames 时不能使用 Distinct 函数。

所以我所做的是创建一个临时 DataFrame 并将其与初始数据帧连接以获得您想要的结果:

case class Dog(F1:String, F2: String, F3: Int)
val df = Seq(Dog("x", "y", 1), Dog("x", "z", 2), Dog("x", "a", 4), Dog("x", "a", 4), Dog("x", "y", 1), Dog("t", "y2", 6), Dog("t", "y3", 4), Dog("t", "y4", 5)).toDF
val unique_F1_F2 = df.select("F1", "F2").distinct.count
val dd = df.withColumn("X1", concat(col("F1"), col("F2")))
           .groupBy("F3")
           .agg(countDistinct(col("X1")).as("distinct_count"))
val final_df = dd.join(df, "F3")
                 .withColumn("F4", col("distinct_count")/unique_F1_F2)
                 .drop("distinct_count")
final_df.show
+---+---+---+-------------------+
| F3| F1| F2|                 F4|
+---+---+---+-------------------+
|  1|  x|  y|0.16666666666666666|
|  1|  x|  y|0.16666666666666666|
|  6|  t| y2|0.16666666666666666|
|  5|  t| y4|0.16666666666666666|
|  4|  t| y3| 0.3333333333333333|
|  4|  x|  a| 0.3333333333333333|
|  4|  x|  a| 0.3333333333333333|
|  2|  x|  z|0.16666666666666666|
+---+---+---+-------------------+

我希望这是你所期望的!

编辑:我将 df.count 更改为 unique_F1_F2


推荐阅读