pyspark - 有效地计算 PySpark GroupedData 上的前 k 个元素(不是 scala)
问题描述
我有一个形式的数据框:
+---+---+----+
| A| B|dist|
+---+---+----+
| a1| b1| 1.0|
| a1| b2| 2.0|
| a2| b1|10.0|
| a2| b2|10.0|
| a2| b3| 2.0|
| a3| b1|10.0|
+---+---+----+
并且,固定max_rank=2,我想得到以下一个
+---+---+----+----+
| A| B|dist|rank|
+---+---+----+----+
| a3| b1|10.0| 1|
| a2| b3| 2.0| 1|
| a2| b1|10.0| 2|
| a2| b2|10.0| 2|
| a1| b1| 1.0| 1|
| a1| b2| 2.0| 2|
+---+---+----+----+
执行此操作的经典方法如下
df = sqlContext.createDataFrame([("a1", "b1", 1.), ("a1", "b2", 2.), ("a2", "b1", 10.), ("a2", "b2", 10.), ("a2", "b3", 2.), ("a3", "b1", 10.)], schema=StructType([StructField("A", StringType(), True), StructField("B", StringType(), True),StructField("dist", FloatType(), True)]))
win = Window().partitionBy(df['A']).orderBy(df['dist'])
out = df.withColumn('rank', rank().over(win))
out = out.filter('rank<=2')
但是,由于使用 OrderBy 的 Window 函数,此解决方案效率低下。
Pyspark 还有另一种解决方案吗?例如类似于 .top(k, key=--) 的 RDD 方法?
我在这里找到了类似的答案,但使用 scala 而不是 python。
解决方案
推荐阅读
- python - 删除所有成员角色并在一段时间后将它们返回
- java - 为什么在 Java 中解析 CSV 电子表格会引发 NumberFormatException?
- java - Java 可选
- > 到地图转换
- mongodb - MongoDb Compass 将数据库引用导出为嵌入式 JSON
- css - 1440px 宽度的媒体查询,但不是笔记本电脑屏幕?
- javascript - 电子邮件和密码不会通过美国传递到 firebase
- sql - 将年数和月数转换为月数
- android - BroadcastReceiver Android 中的 locationManager.requestLocationUpdates
- bash - 为什么在这个 git hook 示例的末尾使用 exec(似乎没有必要)?
- objective-c - 从 URLSession 传递 NSError 的正确方法