scala - Spark收集有限排序列表
问题描述
我正在尝试使用 spark 为数据框创建一个有限的排序列表,但是我想不出一种快速且低内存的方法。
我的数据框由三列、两个键 ID 和一个距离列组成,我想获取靠近每个 ID 的前 n = 50 个 ID 的列表。我尝试 groupBy 后跟 collect_list 后跟 sort_array,然后是 UDF 以仅获取 ID,最后通过 UDF 传递它以获取第一个 n = 50,但它非常慢,有时会出现内存错误。
# Sample Data
val dataFrameTest = Seq(
("key1", "key2", 1),
("key1","key3", 2),
("key1", "key5" ,4),
("key1", "key6" ,5),
("key1","key8" ,6),
("key2", "key7" ,3),
("key2", "key9" ,4),
("key2","key5" ,5)
).toDF("id1", "id2", "distance")
如果限制是 2 想要
"key1" | ["key2", "key3"]
"key2" | ["key7", "key8"]
当前方法:
sorted_df = dataFrameTest.groupBy("key1").agg(collect_list(struct("distance", "id2")).alias("toBeSortedCol")).
withColumn("sortedList", sort_array("toBeSortedCol"))
我的数据非常大,这就是为什么 spark 是唯一的解决方案。我感谢任何帮助/指导。
解决方案
为此使用 Spark SQL 窗口函数之一怎么样?就像是
scala> val dataFrameTest = Seq(
| ("key1", "key2", 1),
| ("key1","key3", 2),
| ("key1", "key5" ,4),
| ("key1", "key6" ,5),
| ("key1","key8" ,6),
| ("key2", "key7" ,3),
| ("key2", "key9" ,4),
| ("key2","key5" ,5)
| ).toDF("id1", "id2", "distance")
dataFrameTest: org.apache.spark.sql.DataFrame = [id1: string, id2: string ... 1 more field]
scala> dataFrameTest.createOrReplaceTempView("sampledata")
scala> spark.sql("""
| select t.id1, collect_list(t.id2) from (
| select id1, id2, row_number() over (partition by id1 order by distance) as rownum from sampledata
| )t
| where t.rownum < 3 group by t.id1
| """).show(false)
+----+-----------------+
|id1 |collect_list(id2)|
+----+-----------------+
|key1|[key2, key3] |
|key2|[key7, key9] |
+----+-----------------+
scala>
只需替换row_number()
为rank()
或dense_rank()
取决于您需要的结果类型。
推荐阅读
- mysql - 用户注册后,将来自 amazon cognito 的用户数据从 laravel 同步到我的 mysql 数据库
- javascript - 在输入到输入上过滤嵌套数组
- mysql - 当 GraphQL 查询只需要返回对象的某些字段时,为什么 MySQL/Sequelize 会执行全选查询?
- ecmascript-6 - ES6: filter then map -> 有没有办法从过滤器中找出元素的数量?
- intellij-idea - IntelliJ Idea 在不应该的情况下创建了额外的 .iml 文件
- python - selenium python中存在元素时如何运行一段代码?
- python - PIL 和 OpenCV 的调整大小有什么区别
- c - 结构中的空指针是否比没有指针占用更多的内存?
- vba - 在 VBA 中一起使用查找和替换功能
- r - R R glm 没有收敛的后果是什么