首页 > 解决方案 > 在不改变窗口逻辑的情况下并行化窗口函数或加速 Spark 中的窗口函数

问题描述

我在 spark 查询中有一组窗口函数,其中包括 user_num 上的分区。其中一个 user_nums 的行数远远多于其他行数。该行在单个任务中计算,该任务具有更高的随机读取、随机远程读取,最终需要大量时间。

Select LAG(e) OVER (PARTITION BY user_num, a, date ORDER BY time) as aa,
FIRST_VALUE(e)  OVER (PARTITION BY a, date ORDER BY time ROWS
BETWEEN CURRENT ROW AND UNBOUNDED FOLLOWING) as bbb
FROM table

是否有任何设置或任何方法可以让它在不同的任务上运行,或者以不需要或最小更改窗口函数逻辑的方式缩短这段时间?

IE 我可以在某个点缓存,增加分区数,增加 exec mem 等。

标签: apache-sparkapache-spark-sql

解决方案


通过牺牲一些数据损失来扩大规模的一个建议,即通过增加 PARTITION 窗口的粒度

示例

  • 将“HOUR”值从时间中提取到新列中。
  • 现在 PARTITION BY user_num, a, date, HOUR

现在,您的单个任务将分为 24 个任务

但这将为您的 LAG带来更多 NULLS。例如,如果您正在计算偏移量为 4 的滞后,那么对于每一小时的数据,您将有 3 个 NULL 值。

因此,这就像速度与准确数据之间的权衡


您可以进一步将粒度增加到分钟,以增加更多的并行性,但代价是更多的 NULL。


损失方面的进一步改善:

如果您选择使用较低粒度,这可能会影响其他“user_nums”。最好分成 2 个 dfs :

  • 分成 2 个 dfs(按计数使用一个组,并有一个线程保持,如果你认为它是固定的,那么 user_num 可以考虑倾斜或者你可以硬编码。)
  • One_with_Skewed_Data
  • One_with_normal_Data
  • result_from_Skew = One_with_Skewed_Data ..具有较低粒度的窗口查询
  • result_from_normal = One_with_normal_Data .. 与您的常规查询

结果 = result_from_Skew.union(result_from_normal)


推荐阅读