首页 > 解决方案 > 如何动态知道 pySpark DF 对于给定列是否具有空值/空值?

问题描述

我必须检查传入的数据是否具有任何或null值。我必须检查的列不固定。我正在从一个配置中读取,其中为具有允许的空能力的不同文件存储列名。""" "

+----------+------------------+--------------------------------------------+
| FileName |     Nullable     |                  Columns                   |
+----------+------------------+--------------------------------------------+
| Sales    | Address2,Phone2  | OrderID,Address1,Address2,Phone1,Phone2    |
| Invoice  | Bank,OfcAddress  | InvoiceNo,InvoiceID,Amount,Bank,OfcAddress |
+----------+------------------+--------------------------------------------+

因此,对于每个数据/文件,我必须查看哪个字段不应该包含null. 基于该过程/错误输出文件。有什么pythonic方法可以做到这一点吗?

标签: python-3.xdataframepyspark

解决方案


您显示的表结构让我相信您已将包含这些作业详细信息的文件作为 Spark DataFrame 阅读。你可能不应该,因为它很可能不是大数据。如果您将其作为 Spark DataFrame,collect将其发送到驱动程序,以便您可以为每个文件创建单独的 Spark 作业。

然后,每个作业都相当简单:您必须从某个文件位置读取。我想,该信息是由 捕获的FileName。现在,我还将假设每个文件的文件格式是相同的。如果没有,您必须添加指示文件格式的元数据。现在,我假设它是 CSV。

接下来,您必须确定需要检查是否存在空值的列子集。这很简单:假设您有一个 DataFrame 中所有列的列表(它可能是从上一步(加载)生成的 DataFrame 派生的)和一个可以包含空值的所有列的列表,列的列表不能包含空值只是这两者之间的区别。

最后,您agg在 DataFrame 上重新计算这些列中的空值数量。由于这是一个 DataFrame 聚合,因此结果集中只有一行,因此您可以head将其带到驱动程序中。Cast 是为了更容易访问属性的字典。

我添加了一个函数 ,summarize_positive_counts它返回至少找到一个空记录的列,从而使原始表中的声明无效。

df.show(truncate=False)
# +--------+---------------+------------------------------------------+
# |FileName|Nullable       |Columns                                   |
# +--------+---------------+------------------------------------------+
# |Sales   |Address2,Phone2|OrderID,Address1,Address2,Phone1,Phone2   |
# |Invoice |Bank,OfcAddress|InvoiceNo,InvoiceID,Amount,Bank,OfcAddress|
# +--------+---------------+------------------------------------------+

jobs = df.collect()  # bring it to the driver, to create new Spark jobs from its 

from pyspark.sql.functions import col, sum as spark_sum


def report_null_counts(frame, job):
    cols_to_verify_not_null = (set(job.Columns.split(","))
                               .difference(job.Nullable.split(",")))
    null_counts = frame.agg(*(spark_sum(col(_).isNull().cast("int")).alias(_)
                              for _ in cols_to_verify_not_null))
    return null_counts.head().asDict()


def summarize_positive_counts(filename, null_counts):
    return {filename: [colname for colname, nbr_of_nulls in null_counts.items()
                       if nbr_of_nulls > 0]}


for job in jobs:  # embarassingly parallellizable
    frame = spark.read.csv(job.FileName, header=True)
    null_counts = report_null_counts(frame, job)
    print(summarize_positive_counts(job.FileName, null_counts))


推荐阅读