首页 > 解决方案 > 在 pyspark 中执行 partitionBy 列时消除特定列的空值行

问题描述

我有一个这样的 pyspark 数据框:

+-----+---+-----+
| id| name|state|
+-----+---+-----+
|111| null|   CT|
|222|name1|   CT|
|222|name2|   CT|
|333|name3|   CT|
|333|name4|   CT|
|333| null|   CT|
+---+-----+-----+

对于给定的 ID,我想保留该记录,即使列“name”为空,如果它的 ID 不重复,但如果 ID 重复,那么我想检查 name 列并确保它不重复包含该 ID 中的重复项,如果“名称”为空,则仅针对重复的 ID 删除。以下是所需的输出:

+-----+---+-----+
| id| name|state|
+-----+---+-----+
|111| null|   CT|
|222|name1|   CT|
|222|name2|   CT|
|333|name3|   CT|
|333|name4|   CT|
+---+-----+-----+

如何在 PySpark 中实现这一点?

标签: apache-sparkpysparknullapache-spark-sqlpyspark-dataframes

解决方案


您可以通过按 id 列分组并计算每个组中的名称数来做到这一点。默认情况下,Spark 中会忽略 Null 值,因此应保留任何计数为 0 的组。我们现在可以过滤掉计数大于 0 的组中的任何空值。

在 Scala 中,这可以通过如下窗口函数来完成:

val w = Window.partitionBy("id")
val df2 = df.withColumn("gCount", count($"name").over(w))
  .filter($"name".isNotNull or $"gCount" === 0)
  .drop("gCount")

PySpark 等价物:

w = Window.partitionBy("id")
df.withColumn("gCount", count("name").over(w))
  .filter((col("name").isNotNull()) | (col("gCount") == 0))
  .drop("gCount")

上面的内容不会删除对于同一 id 具有多个 null 的行(所有这些都将被保留)。

如果这些也应该被删除,只保留一行name==null,一个简单的方法是.dropDuplicates(['id','name'])在运行上述代码之前或之后使用。请注意,这也将删除任何其他重复项(在这种情况下.dropDuplicates(['id','name', 'state'])可能更可取)。


推荐阅读