apache-spark - 如何从 mapPartitions 迭代器创建 DataFrame?
问题描述
我有一个带有id
列的 DataFrame。我想对每个 id 的行进行一些计算(不仅是聚合),并输出一个新的 DataFrame,每个 id 一行,包含计算结果。
我试图通过对 id 重新分区然后使用来做到这一点mapPartitions
:
df.repartition(col("id")).mapPartitions(iter => {
val dfSubset = // iter to DataFrame?
// Computations on dfSubset
})
但是你如何创建一个 DataFrame iter
?dfSubset
目标是然后在包含 id 的所有行的 DataFrame 上进行计算。
编辑:
repartition(col("id"))
不会为每个创建 1 个分区id
。我们应该groupBy("id")
改用。
解决方案
您正在寻找的是做一个groupBy
onid
然后定义您自己的User Defined Aggregate Function。如果您需要所有列,您可以构建所述列的结构并传递给您的聚合函数。
df
.groupBy("id")
.agg(myUdaf(struct(df.columns.filter(_ != "id").map(col(_)):_*)).as("result")).show()
推荐阅读
- node.js - 在 nodejs 上使用 javascript 开发“分类”应用程序
- spring - Spring Batch - 从另一个具有重试机制的Tasklet调用Tasklet?
- python - 自动检测图像和视频文件以进行进一步处理
- java - Browsermob 在 IntelliJ 创建的 jar 文件中导致错误
- python - 在普通字典中设置有序字典
- postgresql-9.4 - Postgres 服务器未启动
- c# - 引用时获取类的原始命名空间
- android - 在android oreo中设备重启后PeriodicWorkRequest不起作用
- c# - OpenXML 合并 Word 文档 多个文件的格式
- security - 是否可以桥接两个 WebSphere MQ 消息传递总线?