首页 > 解决方案 > scala 聚合第一个函数给出了意想不到的结果

问题描述

我在 scala spark 中使用了一个简单的 groupby 查询,其目标是在已排序的数据框中获取组中的第一个值。这是我的火花数据框

+---------------+------------------------------------------+
|ID             |some_flag |some_type  |  Timestamp        |
+---------------+------------------------------------------+
|      656565654|      true|     Type 1|2018-08-10 00:00:00|
|      656565654|     false|     Type 1|2017-08-02 00:00:00|
|      656565654|     false|     Type 2|2016-07-30 00:00:00|
|      656565654|     false|     Type 2|2016-05-04 00:00:00|
|      656565654|     false|     Type 2|2016-04-29 00:00:00|
|      656565654|     false|     Type 2|2015-10-29 00:00:00|
|      656565654|     false|     Type 2|2015-04-29 00:00:00|
+---------------+----------+-----------+-------------------+

这是我的汇总查询

val sampleDF = df.sort($"Timestamp".desc).groupBy("ID").agg(first("Timestamp"), first("some_flag"), first("some_type"))

预期的结果是

+---------------+-------------+---------+-------------------+
|ID             |some_falg    |some_type|  Timestamp        |
+---------------+-------------+---------+-------------------+
|      656565654|         true|   Type 1|2018-08-10 00:00:00|
+---------------+-------------+---------+-------------------+

但是得到以下奇怪的输出并且它像随机行一样不断变化

+---------------+-------------+---------+-------------------+
|ID             |some_falg    |some_type|  Timestamp        |
+---------------+-------------+---------+-------------------+
|      656565654|        false|   Type 2|2015-10-29 00:00:00|
+---------------+-------------+---------+-------------------+

另请注意,数据框中没有空值。我在做错事时挠头。需要帮忙!

标签: scalaapache-sparkapache-spark-sql

解决方案


只是为了补充Vamsi的答案;问题是groupBy结果组中的值没有按任何特定顺序返回(特别是考虑到 Spark 操作的分布式性质),因此该first函数的命名可能具有误导性。它返回为该列找到的第一个非空值,即组内该列的几乎所有非空值。

在 之前对行进行排序groupBy不会以任何可重现的方式影响组内的顺序。

另请参阅此博客文章,其中解释了由于上述行为,您从多个first调用中获得的值甚至可能不是来自组内的同一行。

输入 3 列“k, t, v”的数据</p>

z, 1, null
z, 2, 1.5
z, 3, 2.4

代码:

df.groupBy("k").agg(
  $"k",
  first($"t"),
  first($"v")
)

输出:

z, 1, 1.5

这个结果是 2 条记录的混合!


推荐阅读