首页 > 解决方案 > 是否可以强制 UDF 在 spark 中按行顺序(按组)执行?

问题描述

我想将任意 scala 窗口感知聚合函数应用于数据集。例如,将“列”的最后 3 行相乘(愚蠢但简单的示例)。

其中许多可以转换为本机 spark 函数,但不是全部(外部库等) - 我不想必须这样做,因为它们在其他应用程序中按原样使用(并且只需要在 spark 中复制)。这些函数中的每一个都折叠/聚合新行并返回一个新值(对于该行,丢弃任何超出范围的行)。

如何按组有效地按行顺序调用这些任意函数?(显然在一个分区内 - 我认为它不能跨分区工作)。

例如:

trait UpdatingFunction {
  /* adds new value, discards oldest value and calculates result */
  def newValue(i: Int): Int
}

class AddValue2RowsPrior extends UpdatingFunction {...}

df.select(col("value"), udf(new AddValue2RowsPrior)(col("value"))).show()

会产生类似的东西

价值 AddValue2RowsPrior
1 1
2 2
3 4 //(3+1)
4 6 //(4+2)
5 8

我目前的想法是(欢迎评论):

标签: scalaapache-sparkaggregate-functionsuser-defined-functions

解决方案


在这个阶段不是一个答案,但我在短期内必须:

mapPartitions(Iterator[T] => Iterator[U])- 行将按指定的顺序排列,迭代器可以用 a 包装LazyList并按您喜欢的方式处理。在这种情况下,您需要检测自己的窗口边界。

...在未来的某个时候,我会尝试明确地回答最初的问题,我保证;)。


推荐阅读