首页 > 解决方案 > 无法将数组放入接受可变参数的函数参数中

问题描述

我试图制作一个可以DataFrame使用窗口函数通过“会话”列丰富给定的函数。所以我需要使用partitionByand orderBy

val by_uuid_per_date = Window.partitionBy("uuid").orderBy("year","month","day")

// A Session = A day of events for a certain user. uuid x (year+month+day)

val enriched_df = df
.withColumn("session", dense_rank().over(by_uuid_per_date))
.orderBy("uuid","timestamp")
.select("uuid","year","month","day","session")

这很完美,但是当我尝试制作一个封装这种行为的函数时:

PS:我使用了_*splat 运算符。

def enrich_with_session(df:DataFrame, 
                        window_partition_cols:Array[String], 
                        window_order_by_cols:Array[String],
                        presentation_order_by_cols:Array[String]):DataFrame={

  val by_uuid_per_date = Window.partitionBy(window_partition_cols: _*).orderBy(window_order_by_cols: _*)

  df.withColumn("session", dense_rank().over(by_uuid_per_date))
  .orderBy(presentation_order_by_cols:_*)
  .select("uuid","year","month","mday","session")
} 

我收到以下错误:

notebook:6: error: no `: _*' annotation here allowed (这样的注释只允许在-parameters的参数中) val by_uuid_per_date = Window.partitionBy(window_partition_cols: _ ).orderBy(window_order_by_cols: _*)

标签: scalaapache-sparkapache-spark-sql

解决方案


partitionByorderBy期望Seq[Column]Array[Column]作为参数,见下文:

val data = Seq(
(1,99),
(1,99),
(1,70),
(1,20)
).toDF("id","value")

data.select('id,'value, rank().over(Window.partitionBy('id).orderBy('value))).show()

val partitionBy: Seq[Column] = Seq(data("id"))
val orderBy: Seq[Column] = Seq(data("value"))
data.select('id,'value, rank().over(Window.partitionBy(partitionBy:_*).orderBy(orderBy:_*))).show()

因此,在这种情况下,您的代码应如下所示:

  def enrich_with_session(df:DataFrame,
                      window_partition_cols:Array[String],
                      window_order_by_cols:Array[String],
                      presentation_order_by_cols:Array[String]):DataFrame={

val window_partition_cols_2: Array[Column] = window_partition_cols.map(df(_))
val window_order_by_cols_2: Array[Column] = window_order_by_cols.map(df(_))
val presentation_order_by_cols_2: Array[Column] = presentation_order_by_cols.map(df(_))

val by_uuid_per_date = Window.partitionBy(window_partition_cols_2: _*).orderBy(window_order_by_cols_2: _*)

df.withColumn("session", dense_rank().over(by_uuid_per_date))
  .orderBy(presentation_order_by_cols_2:_*)
  .select("uuid","year","month","mday","session")

}


推荐阅读