首页 > 解决方案 > 如何在 spark scala 中使用子查询创建列表达式

问题描述

给定任何df,我想为df计算另一个名为“has_duplicates”的列,然后添加一个带有布尔值的列,以确定每行是否唯一。示例输入 df:

val df = Seq((1, 2), (2, 5), (1, 7), (1, 2), (2, 5)).toDF("A", "B")

给定一个 input columns: Seq[String],我知道如何获取每一行的计数:

val countsDf = df.withColumn("count", count("*").over(Window.partitionBy(columns.map(col(_)): _*)))

但我不确定如何使用它为最后一列创建列表达式,指示每一行是否唯一。

就像是

def getEvaluationExpression(df: DataFrame): Column = {
    when("count > 1", lit("fail").otherwise(lit("pass"))
 }

但是需要使用上面的查询在现场评估计数。

标签: scalaapache-sparkapache-spark-sql

解决方案


试试下面的代码。

scala> df.withColumn("has_duplicates", when(count("*").over(Window.partitionBy(df.columns.map(col(_)): _*)) > 1 , lit("fail")).otherwise("pass")).show(false)
+---+---+--------------+
|A  |B  |has_duplicates|
+---+---+--------------+
|1  |7  |pass          |
|1  |2  |fail          |
|1  |2  |fail          |
|2  |5  |fail          |
|2  |5  |fail          |
+---+---+--------------+

或者

scala> df.withColumn("count",count("*").over(Window.partitionBy(df.columns.map(col(_)): _*))).withColumn("has_duplicates", when($"count" > 1 , lit("fail")).otherwise("pass")).show(false)
+---+---+-----+--------------+
|A  |B  |count|has_duplicates|
+---+---+-----+--------------+
|1  |7  |1    |pass          |
|1  |2  |2    |fail          |
|1  |2  |2    |fail          |
|2  |5  |2    |fail          |
|2  |5  |2    |fail          |
+---+---+-----+--------------+

推荐阅读