首页 > 解决方案 > Pyspark 显示每行具有最低值的列

问题描述

我有以下数据框

在此处输入图像描述

df_old_list= [
  { "Col1":"0", "Col2" : "7","Col3": "8", "Col4" : "","Col5": "20"},
{"Col1":"5", "Col2" : "5","Col3": "5", "Col4" : "","Col5": "28"},
 { "Col1":"-1", "Col2" : "-1","Col3": "13", "Col4" : "","Col5": "83"},

 {"Col1":"-1", "Col2" : "6","Col3": "6", "Col4" : "","Col5": "18"},

 { "Col1":"5", "Col2" : "4","Col3": "2", "Col4" : "","Col5": "84"},

  { "Col1":"0", "Col2" : "0","Col3": "14", "Col4" : "7","Col5": "86"}
]

spark = SparkSession.builder.getOrCreate()
df_old_list = spark.createDataFrame(Row(**x) for x in df_old_list)
df_old_list.show()

+----+----+----+----+----+
|Col1|Col2|Col3|Col4|Col5|
+----+----+----+----+----+
|   0|   7|   8|    |  20|
|   5|   5|   5|    |  28|
|  -1|  -1|  13|    |  83|
|  -1|   6|   6|    |  18|
|   5|   4|   2|    |  84|
|   0|   0|  14|   7|  86|
+----+----+----+----+----+

我想从每一行的每一列中获取最低值。

这是我迄今为止能够实现的

df1=df_old_list.selectExpr("*","array_sort(split(concat_ws(',',*),','))[0] lowest_col")

df1.show()

+----+----+----+----+----+----------+
|Col1|Col2|Col3|Col4|Col5|lowest_col|
+----+----+----+----+----+----------+
|   0|   7|   8|    |  20|          |
|   5|   5|   5|    |  28|          |
|  -1|  -1|  13|    |  83|          |
|  -1|   6|   6|    |  18|          |
|   5|   4|   2|    |  84|          |
|   0|   0|  14|   7|  86|         0|
+----+----+----+----+----+----------+

问题是 Col4 是空白的,因此它无法计算最小值。我正在寻找的是这样的:无论空白列如何,我都会得到最低值,并且如果有多个最低数字,则该列字段名称应以连接的形式显示在最低列标题中。

+-----------------+----------+----+----+----+----+----+
|lowest_cols_title|lowest_col|Col1|Col2|Col3|Col4|Col5|
+-----------------+----------+----+----+----+----+----+
|             Col1|         0|   0|   7|   8|    |  20|
|   Col1;Col2;Col3|         5|   5|   5|   5|    |  28|
|        Col1;Col2|        -1|  -1|  -1|  13|    |  83|
|             Col1|        -1|  -1|   6|   6|    |  18|
|             Col3|         5|   5|   4|   2|    |  84|
|        Col1;Col2|         0|   0|   0|  14|   7|  86|
+-----------------+----------+----+----+----+----+----+

标签: pythondataframeapache-sparkpyspark

解决方案


您可以使用pyspark.sql.functions.least

返回列名列表的最小值,跳过空值。此函数至少需要 2 个参数。如果所有参数都为空,它将返回空。

一旦我们有了最小列,我们就可以将最小值与所有列进行比较并创建另一列。

创建数据框:

from pyspark.sql import Row
from pyspark.sql.functions import col,least,when,array,concat_ws
df_old_list= [
  { "Col1":"0", "Col2" : "7","Col3": "8", "Col4" : "","Col5": "20"},  {"Col1":"5", "Col2" : "5","Col3": "5", "Col4" : "","Col5": "28"},
  { "Col1":"-1", "Col2" : "-1","Col3": "13", "Col4" : "","Col5": "83"},  {"Col1":"-1", "Col2" : "6","Col3": "6", "Col4" : "","Col5": "18"},
  { "Col1":"5", "Col2" : "4","Col3": "2", "Col4" : "","Col5": "84"},  { "Col1":"0", "Col2" : "0","Col3": "14", "Col4" : "7","Col5": "86"}]
df = spark.createDataFrame(Row(**x) for x in df_old_list)
from pyspark.sql.functions import least, when

计算逐行最小值和具有最小值的所有列。

collist = df.columns
min_ = least(*[
    when(col(c) == "", float("inf")).otherwise(col(c).cast('int'))
    for c in df.columns
]).alias("lowest_col")

df = df.select("*", min_)
df = df.select("*",concat_ws(";",array([
       when(col(c)==col("lowest_col") ,c).otherwise(None) 
       for c in collist
     ])).alias("lowest_cols_title") )

df.show(10,False)

输出:

+----+----+----+----+----+----------+-----------------+
|Col1|Col2|Col3|Col4|Col5|lowest_col|lowest_cols_title|
+----+----+----+----+----+----------+-----------------+
|0   |7   |8   |    |20  |0.0       |Col1             |
|5   |5   |5   |    |28  |5.0       |Col1;Col2;Col3   |
|-1  |-1  |13  |    |83  |-1.0      |Col1;Col2        |
|-1  |6   |6   |    |18  |-1.0      |Col1             |
|5   |4   |2   |    |84  |2.0       |Col3             |
|0   |0   |14  |7   |86  |0.0       |Col1;Col2        |
+----+----+----+----+----+----------+-----------------+

推荐阅读