首页 > 解决方案 > 获取所有列条目都为空的 groupby 数据框

问题描述

我正在使用 pyspark 2.4.5 并且有一个数据框,我已经对其进行过滤以包含所有条目作为 groupby 的一部分,这些条目包含空值

df_nulls = df.where(reduce(lambda x, y: x | y, (col(c).isNull() for c in df.columns)))

从此我想进一步过滤以删除(并获取单独的数据框)所有列都具有空值的所有条目。

目前,我能够通过检查该列的最小值和最大值是否都为空并基于此返回 1 或 0 来为一列实现此目的

agg_expression = [when((min(c).eqNullSafe(max(c))).alias(c) , 1).otherwise(0).alias(c) for c in columns]

df_run_all_nulls = df_nulls.groupby("cat1", "cat2", "cat3", "cat4").agg(*agg_expression)

然后,我可以在此数据帧上进一步过滤以获取与 null 或非 null 值相关的条目

df_run_all_nulls.where(df_run_all_nulls.col1 == 1).count()

我可以循环并获取数据集中每一列的信息(我对不同列之间所有空值的重叠感兴趣),但想知道是否有更好/更智能的方法来做这样的事情?

我还想知道是否有所有列都为空的条目。

我的初始数据的示例数据框看起来像

| cat1 | col1 | col2 | col3 | col4 |
| 1    | 1    | null | null | null | 
| 1    | 2    | null | null | null |
| 2    | 1    | 50   | 0.3  | 2    |
| 2    | 2    | 60   | 0.3  | 6    |
| 1    | 3    | null | null | null |
| 3    | 1    | null | 10   | null |
| 3    | 2    | null | 2    | 2    |
| 3    | 3    | null | 20   | 4    |

其中 cat1 表示一个分组(在我的情况下是一个正在运行的进程), col1 表示一个时间步长,它的长度可能取决于正在运行的进程,然后 cols 2 和 3 是在此过程中每个时间步长获取的传感器读数。

所以我想从上面提取两个数据帧,一个只包括所有传感器数据为空的进程,但这里会有列,默认情况下总是记录数据,所以空检查应该在列的子集上。

| cat1 | col1 | col2 | col3 | col4 |
| 1    | 1    | null | null | null | 
| 1    | 2    | null | null | null |
| 1    | 3    | null | null | null |

这里实际上只是一个唯一的 cat1 条目列表就足够了,在这种情况下 [1] (但实际上会发现更多)

然后,第二个数据帧应该只包含一些数据包含空值的进程。

| cat1 | col1 | col2 | col3 | col4 |
| 3    | 1    | null | 10   | null |
| 3    | 2    | null | 2    | 2    |
| 3    | 3    | null | 20   | 4    |

标签: pythondataframeapache-sparkpyspark

解决方案


让我们用一些Window函数试试这个:

from functools import reduce

from pyspark.sql import functions as F, Window as W


exclude_cols = ["cat1", "col1"]

df = reduce(
    lambda a, b: a.withColumn(b["colName"], b["col"]),
    [
        {
            "colName": f"{col}_grp",
            "col": F.max(F.when(F.col(col).isNotNull(), 1).otherwise(0)).over(
                W.partitionBy("cat1")
            ),
        }
        for col in df.columns
        if col not in exclude_cols
    ],
    df,
)

df.show()
+----+----+----+----+----+--------+--------+--------+
|cat1|col1|col2|col3|col4|col2_grp|col3_grp|col4_grp|
+----+----+----+----+----+--------+--------+--------+
|   1|   1|null|null|null|       0|       0|       0|
|   1|   2|null|null|null|       0|       0|       0|
|   1|   3|null|null|null|       0|       0|       0|
|   3|   1|null|10.0|null|       0|       1|       1|
|   3|   2|null| 2.0|   2|       0|       1|       1|
|   3|   3|null|20.0|   4|       0|       1|       1|
|   2|   2|  60| 0.3|   6|       1|       1|       1|
|   2|   1|  50| 0.3|   2|       1|       1|       1|
+----+----+----+----+----+--------+--------+--------+

从此数据框中,您可以使用简单的 where 选择所需的行:

# first dataframe 
df.where(
    F.greatest(*(F.col(col) for col in df.columns if col.endswith("_grp"))) == 0
).show()
+----+----+----+----+----+--------+--------+--------+                           
|cat1|col1|col2|col3|col4|col2_grp|col3_grp|col4_grp|
+----+----+----+----+----+--------+--------+--------+
|   1|   1|null|null|null|       0|       0|       0|
|   1|   2|null|null|null|       0|       0|       0|
|   1|   3|null|null|null|       0|       0|       0|
+----+----+----+----+----+--------+--------+--------+

# second one (which theoretically should include ID 1 also)
df.where(
    F.least(*(F.col(col) for col in df.columns if col.endswith("_grp"))) == 0
).show()
+----+----+----+----+----+--------+--------+--------+                           
|cat1|col1|col2|col3|col4|col2_grp|col3_grp|col4_grp|
+----+----+----+----+----+--------+--------+--------+
|   1|   1|null|null|null|       0|       0|       0|
|   1|   2|null|null|null|       0|       0|       0|
|   1|   3|null|null|null|       0|       0|       0|
|   3|   1|null|10.0|null|       0|       1|       1|
|   3|   2|null| 2.0|   2|       0|       1|       1|
|   3|   3|null|20.0|   4|       0|       1|       1|
+----+----+----+----+----+--------+--------+--------+

推荐阅读