首页 > 解决方案 > PySpark 高效方式 N 最大元素

问题描述

我的数据集 所以我必须从这个数据集中得到 n 个(默认 3 个)最大的元素。如何以可接受的方式在 PySpark 中执行此操作?我知道如何在 Pandas 中做到这一点,但我想知道如何在 PySpark 中有效地完成它,或者它是否可以有效地完成。我的第一个想法是像这样使用 pyspark.sql.functions 中的最大

ls = []
cols = df_tmp.columns[:-1]
for j in cols:
        max_v = df_tmp.where(df_tmp["Variable"] == j).select(F.greatest(*[F.col(col) for col in cols]))
        ls.append(max_v.collect()[0][0])
return ls.max

但这似乎是一种非常糟糕的方法,因为它返回了最大的值(0.984)而不是组合(Charlie,Foxtrot)。此外,如果不重写单元格中的值(Charlie,Foxtrot),我看不到如何获得第二大值,这是我认为你不应该在 PySpark 中做的事情。

感谢您阅读本文,特别是感谢所有可能回答的人:)

标签: pysparkmax

解决方案


您可以合并从 Alpha 到 Foxtrot 的所有列,以创建具有三列(数值、变量列、值的列名)的数据框。请看下面的例子:

import random
#creating a dataframe similiar to yours
columns = ['A','B','C','D','E','F']
l = [[random.random() if c!=r else None for c in range(6)] for r in range(6)]
l = [x + [columns[i]] for i,x in enumerate(l)]

df=spark.createDataFrame(l, columns)
df.show()

输出:


+-------------------+--------------------+--------------------+--------------------+-------------------+-------------------+---+
|                  A|                   B|                   C|                   D|                  E|                  F| _7|
+-------------------+--------------------+--------------------+--------------------+-------------------+-------------------+---+
|               null| 0.37958341713258026| 0.31880755415785833|  0.8908555547489883|0.41632799280431776| 0.0729721304772899|  A|
|0.21814744903713268|                null|0.024393462170815394|  0.9940573571339111| 0.7841527980918188|  0.194722179975509|  B|
|  0.786507586894131|  0.9155528558183477|                null|  0.5782381547037391| 0.9714912596343181| 0.5446460767903856|  C|
| 0.9108497603580163|  0.5088891113970719| 0.35594300627798736|                null|  0.514258802933162|0.19317616393798986|  D|
|  0.193214269992123|  0.6259176088252493|  0.4425532657461867|0.050484163355697276|               null| 0.6594661109179668|  E|
| 0.5567272189587709|0.020606558131312402| 0.21905184240270814|  0.2817064382900445| 0.5409619970394691|               null|  F|
+-------------------+--------------------+--------------------+--------------------+-------------------+-------------------+---+
import pyspark.sql.functions as F

newdf = df.select(F.col('A').alias('value'), F.col('_7').alias('row'), F.lit('A').alias('column'))

for col in columns[1:]:
    newdf = newdf.union(df.select(col, '_7', F.lit(col)))

newdf.orderBy(newdf.value.desc()).show(3)

输出:

+------------------+---+------+
|             value|row|column|
+------------------+---+------+
|0.9940573571339111|  B|     D|
|0.9714912596343181|  C|     E|
|0.9155528558183477|  C|     B|
+------------------+---+------+

推荐阅读