apache-spark - Spark 使用自定义函数聚合行
问题描述
为简单起见,假设我们有一个包含以下数据的数据框:
+----------+---------+----------+----------+
|firstName |lastName |Phone |Address |
+----------+---------+----------+----------+
|firstName1|lastName1|info1 |info2 |
|firstName1|lastName1|myInfo1 |dummyInfo2|
|firstName1|lastName1|dummyInfo1|myInfo2 |
+----------+---------+----------+----------+
如何合并按 (firstName,lastName) 分组的所有行,并保留以“my”开头的仅电话和地址列中的数据以获得以下信息:
+----------+---------+----------+----------+
|firstName |lastName |Phone |Address |
+----------+---------+----------+----------+
|firstName1|lastName1|myInfo1 |myInfo2 |
+----------+---------+----------+----------+
也许我应该将 agg 函数与自定义 UDAF 一起使用?但是我该如何实现呢?
注意:我使用 Spark 2.2 和 Scala 2.11。
解决方案
您可以使用groupBy
和collect_set
聚合函数并使用udf
函数过滤以“my”开头的第一个字符串
import org.apache.spark.sql.functions._
def myudf = udf((array: Seq[String]) => array.filter(_.startsWith("my")).head)
df.groupBy("firstName ", "lastName")
.agg(myudf(collect_set("Phone")).as("Phone"), myudf(collect_set("Address")).as("Address"))
.show(false)
这应该给你
+----------+---------+-------+-------+
|firstName |lastName |Phone |Address|
+----------+---------+-------+-------+
|firstName1|lastName1|myInfo1|myInfo2|
+----------+---------+-------+-------+
我希望答案有帮助
推荐阅读
- xmlhttprequest - 通过自定义图像上传适配器上传后,我的没有 src 属性
- metabase - 数字过滤器不起作用,=,<,> 运算符过滤器不起作用
- symfony4 - Symfony4。使用独白还是不使用?
- node.js - Mongoose findOneAndUpdate 更新错误的实体
- python - 为什么我们不将参数传递给 read() 函数?
- python - 有什么方法可以检查 Python Socket 中是否有传入消息
- python - PyCrypto 签名验证与其他语言一起失败
- java - 当我有特定的坐标来检测球和砖块之间的碰撞时,为什么球对象会与不可见的墙壁发生碰撞?
- apache-kafka - zookeeper.session.timeout.ms 是否适用于消费者或代理?
- python - 根据某些条件在python中将连续列转换为二进制