首页 > 解决方案 > scala spark:将范围内的列表收集到一行中

问题描述

假设我有一个如下的数据框

+----+----------+----+----------------+
|colA|      colB|colC|            colD|
+----+----------+----+----------------+
|   1|2020-03-24|  21|[0.0, 2.49, 3.1]|
|   1|2020-03-17|  20|[1.0, 2.49, 3.1]|
|   1|2020-03-10|  19|[2.0, 2.49, 3.1]|
|   2|2020-03-24|  21|[0.0, 2.49, 3.1]|
|   2|2020-03-17|  20|[1.0, 2.49, 3.1]|
+----+----------+----+----------------+

我想将colD收集到单行中,这也只是收集范围内的列表。

Output
+----+----------+----+----------------+------------------------------------------------------+
|colA|colB      |colC|colD            |colE                                                  |
+----+----------+----+----------------+------------------------------------------------------+
|1   |2020-03-24|21  |[0.0, 2.49, 3.1]|[[0.0, 2.49, 3.1], [1.0, 2.49, 3.1]]                  |
|1   |2020-03-17|20  |[1.0, 2.49, 3.1]|[[1.0, 2.49, 3.1], [2.0, 2.49, 3.1]]
|1   |2020-03-10|19  |[2.0, 2.49, 3.1]|[[2.0, 2.49, 3.1]]                  |
|2   |2020-03-24|21  |[0.0, 2.49, 3.1]|[[0.0, 2.49, 3.1], [1.0, 2.49, 3.1]]                  |
|2   |2020-03-17|20  |[1.0, 2.49, 3.1]|[[1.0, 2.49, 3.1]]                  |
+----+----------+----+----------------+------------------------------------------------------+

我尝试了以下,但它给了我错误:

cannot resolve 'RANGE BETWEEN CAST((`colC` - 2) AS STRING) FOLLOWING AND CAST(`colC` AS STRING) FOLLOWING' due to data type mismatch: Window frame lower bound 'cast((colC#575 - 2) as string)' is not a literal.;;
val data = Seq(("1", "2020-03-24", 21,  List(0.0, 2.49,3.1)), ("1", "2020-03-17", 20,  List(1.0, 2.49,3.1)), ("1", "2020-03-10", 19,  List(2.0, 2.49,3.1)), ("2", "2020-03-24", 21,  List(0.0, 2.49,3.1)), 
                ("2", "2020-03-17", 20,  List(1.0, 2.49,3.1))
            )
val rdd = spark.sparkContext.parallelize(data)
val df = rdd.toDF("colA","colB", "colC", "colD")
df.show()

val df1 =   df
            .withColumn("colE", collect_list("colD").over(Window.partitionBy("colA")
                                  .orderBy("colB").rangeBetween($"colC" - lit(2), $"colC")))
            .show(false)

标签: scalaapache-spark-2.0

解决方案


推荐阅读