scala - 是否可以强制 UDF 在 spark 中按行顺序(按组)执行?
问题描述
我想将任意 scala 窗口感知聚合函数应用于数据集。例如,将“列”的最后 3 行相乘(愚蠢但简单的示例)。
其中许多可以转换为本机 spark 函数,但不是全部(外部库等) - 我不想必须这样做,因为它们在其他应用程序中按原样使用(并且只需要在 spark 中复制)。这些函数中的每一个都折叠/聚合新行并返回一个新值(对于该行,丢弃任何超出范围的行)。
如何按组有效地按行顺序调用这些任意函数?(显然在一个分区内 - 我认为它不能跨分区工作)。
例如:
trait UpdatingFunction {
/* adds new value, discards oldest value and calculates result */
def newValue(i: Int): Int
}
class AddValue2RowsPrior extends UpdatingFunction {...}
df.select(col("value"), udf(new AddValue2RowsPrior)(col("value"))).show()
会产生类似的东西
价值 | AddValue2RowsPrior |
---|---|
1 | 1 |
2 | 2 |
3 | 4 //(3+1) |
4 | 6 //(4+2) |
5 | 8 |
我目前的想法是(欢迎评论):
- 具有组和排序的基于动态类的 UDF 可能不需要任何额外的东西就可以工作
- 将
value
列转换为数组并作为(非常大的)行处理 - 具有数组/数据集输出值的 UDAF?
RDD.map
- 使用窗口,只使用函数作为对每个窗口进行操作的 UDAF,有效地丢弃了折叠机制。(吸引人,因为一天的电脑时间比我的时间便宜 - 但是哦,太烦人了)
解决方案
在这个阶段不是一个答案,但我在短期内必须:
mapPartitions(Iterator[T] => Iterator[U])
- 行将按指定的顺序排列,迭代器可以用 a 包装LazyList
并按您喜欢的方式处理。在这种情况下,您需要检测自己的窗口边界。
...在未来的某个时候,我会尝试明确地回答最初的问题,我保证;)。
推荐阅读
- jquery - 关闭模式后多次加载jQuery ajax
- mysql - 多个控件字段在 node.js 应用程序中不起作用
- java - 使用 Jhipster JDL 时出现 QueryService 错误
- c# - 根据具有属性的属性复制派生类的实例
- angular - 在primeng计划中初始化后更改“validRange”属性
- ruby - 我刚刚回到过去
- node.js - 无法使用反应功能调用 express app.post
- python-2.7 - 如何使用 Robotframework 自动化 iframe 中的文本字段
- reactjs - 子组件看不到父组件状态更新
- windows - 将 __int64 转换为 FileTime