首页 > 解决方案 > 如何从 mapPartitions 迭代器创建 DataFrame?

问题描述

我有一个带有id列的 DataFrame。我想对每个 id 的行进行一些计算(不仅是聚合),并输出一个新的 DataFrame,每个 id 一行,包含计算结果。

我试图通过对 id 重新分区然后使用来做到这一点mapPartitions

df.repartition(col("id")).mapPartitions(iter => {
    val dfSubset = // iter to DataFrame?
    // Computations on dfSubset
})

但是你如何创建一个 DataFrame iterdfSubset目标是然后在包含 id 的所有行的 DataFrame 上进行计算。

编辑:

repartition(col("id"))不会为每个创建 1 个分区id。我们应该groupBy("id")改用。

标签: apache-sparkapache-spark-sql

解决方案


您正在寻找的是做一个groupByonid然后定义您自己的User Defined Aggregate Function。如果您需要所有列,您可以构建所述列的结构并传递给您的聚合函数。

df
.groupBy("id")
.agg(myUdaf(struct(df.columns.filter(_ != "id").map(col(_)):_*)).as("result")).show()

推荐阅读