python - Pyspark 显示每行具有最低值的列
问题描述
我有以下数据框
df_old_list= [
{ "Col1":"0", "Col2" : "7","Col3": "8", "Col4" : "","Col5": "20"},
{"Col1":"5", "Col2" : "5","Col3": "5", "Col4" : "","Col5": "28"},
{ "Col1":"-1", "Col2" : "-1","Col3": "13", "Col4" : "","Col5": "83"},
{"Col1":"-1", "Col2" : "6","Col3": "6", "Col4" : "","Col5": "18"},
{ "Col1":"5", "Col2" : "4","Col3": "2", "Col4" : "","Col5": "84"},
{ "Col1":"0", "Col2" : "0","Col3": "14", "Col4" : "7","Col5": "86"}
]
spark = SparkSession.builder.getOrCreate()
df_old_list = spark.createDataFrame(Row(**x) for x in df_old_list)
df_old_list.show()
+----+----+----+----+----+
|Col1|Col2|Col3|Col4|Col5|
+----+----+----+----+----+
| 0| 7| 8| | 20|
| 5| 5| 5| | 28|
| -1| -1| 13| | 83|
| -1| 6| 6| | 18|
| 5| 4| 2| | 84|
| 0| 0| 14| 7| 86|
+----+----+----+----+----+
我想从每一行的每一列中获取最低值。
这是我迄今为止能够实现的
df1=df_old_list.selectExpr("*","array_sort(split(concat_ws(',',*),','))[0] lowest_col")
df1.show()
+----+----+----+----+----+----------+
|Col1|Col2|Col3|Col4|Col5|lowest_col|
+----+----+----+----+----+----------+
| 0| 7| 8| | 20| |
| 5| 5| 5| | 28| |
| -1| -1| 13| | 83| |
| -1| 6| 6| | 18| |
| 5| 4| 2| | 84| |
| 0| 0| 14| 7| 86| 0|
+----+----+----+----+----+----------+
问题是 Col4 是空白的,因此它无法计算最小值。我正在寻找的是这样的:无论空白列如何,我都会得到最低值,并且如果有多个最低数字,则该列字段名称应以连接的形式显示在最低列标题中。
+-----------------+----------+----+----+----+----+----+
|lowest_cols_title|lowest_col|Col1|Col2|Col3|Col4|Col5|
+-----------------+----------+----+----+----+----+----+
| Col1| 0| 0| 7| 8| | 20|
| Col1;Col2;Col3| 5| 5| 5| 5| | 28|
| Col1;Col2| -1| -1| -1| 13| | 83|
| Col1| -1| -1| 6| 6| | 18|
| Col3| 5| 5| 4| 2| | 84|
| Col1;Col2| 0| 0| 0| 14| 7| 86|
+-----------------+----------+----+----+----+----+----+
解决方案
您可以使用pyspark.sql.functions.least
返回列名列表的最小值,跳过空值。此函数至少需要 2 个参数。如果所有参数都为空,它将返回空。
一旦我们有了最小列,我们就可以将最小值与所有列进行比较并创建另一列。
创建数据框:
from pyspark.sql import Row
from pyspark.sql.functions import col,least,when,array,concat_ws
df_old_list= [
{ "Col1":"0", "Col2" : "7","Col3": "8", "Col4" : "","Col5": "20"}, {"Col1":"5", "Col2" : "5","Col3": "5", "Col4" : "","Col5": "28"},
{ "Col1":"-1", "Col2" : "-1","Col3": "13", "Col4" : "","Col5": "83"}, {"Col1":"-1", "Col2" : "6","Col3": "6", "Col4" : "","Col5": "18"},
{ "Col1":"5", "Col2" : "4","Col3": "2", "Col4" : "","Col5": "84"}, { "Col1":"0", "Col2" : "0","Col3": "14", "Col4" : "7","Col5": "86"}]
df = spark.createDataFrame(Row(**x) for x in df_old_list)
from pyspark.sql.functions import least, when
计算逐行最小值和具有最小值的所有列。
collist = df.columns
min_ = least(*[
when(col(c) == "", float("inf")).otherwise(col(c).cast('int'))
for c in df.columns
]).alias("lowest_col")
df = df.select("*", min_)
df = df.select("*",concat_ws(";",array([
when(col(c)==col("lowest_col") ,c).otherwise(None)
for c in collist
])).alias("lowest_cols_title") )
df.show(10,False)
输出:
+----+----+----+----+----+----------+-----------------+
|Col1|Col2|Col3|Col4|Col5|lowest_col|lowest_cols_title|
+----+----+----+----+----+----------+-----------------+
|0 |7 |8 | |20 |0.0 |Col1 |
|5 |5 |5 | |28 |5.0 |Col1;Col2;Col3 |
|-1 |-1 |13 | |83 |-1.0 |Col1;Col2 |
|-1 |6 |6 | |18 |-1.0 |Col1 |
|5 |4 |2 | |84 |2.0 |Col3 |
|0 |0 |14 |7 |86 |0.0 |Col1;Col2 |
+----+----+----+----+----+----------+-----------------+
推荐阅读
- c# - 在数据库中更新时未保存嵌套的拥有类型
- visual-studio-code - 如何在 Ubuntu 中安装 VSCode 扩展
- r - 更改直方图比例
- c - 我在 C 中的数组的函数调用中遇到错误?
- python - 尝试在熊猫中使用聚合求和函数
- python - ReadTimoutError 在 selenium 中打开多个 webdrivers
- node.js - SPDY + Express:从服务器下载的 zip 文件已损坏
- python - matplotlib 交换 x 和 y 轴
- docker - 每当我运行 Docker 容器时获取“/usr/bin/python: No module named”
- c++ - C++ 池分配器程序仅在控制台关闭时崩溃