python-3.x - 如何动态知道 pySpark DF 对于给定列是否具有空值/空值?
问题描述
我必须检查传入的数据是否具有任何或null
值。我必须检查的列不固定。我正在从一个配置中读取,其中为具有允许的空能力的不同文件存储列名。""
" "
+----------+------------------+--------------------------------------------+
| FileName | Nullable | Columns |
+----------+------------------+--------------------------------------------+
| Sales | Address2,Phone2 | OrderID,Address1,Address2,Phone1,Phone2 |
| Invoice | Bank,OfcAddress | InvoiceNo,InvoiceID,Amount,Bank,OfcAddress |
+----------+------------------+--------------------------------------------+
因此,对于每个数据/文件,我必须查看哪个字段不应该包含null
. 基于该过程/错误输出文件。有什么pythonic方法可以做到这一点吗?
解决方案
您显示的表结构让我相信您已将包含这些作业详细信息的文件作为 Spark DataFrame 阅读。你可能不应该,因为它很可能不是大数据。如果您将其作为 Spark DataFrame,collect
将其发送到驱动程序,以便您可以为每个文件创建单独的 Spark 作业。
然后,每个作业都相当简单:您必须从某个文件位置读取。我想,该信息是由 捕获的FileName
。现在,我还将假设每个文件的文件格式是相同的。如果没有,您必须添加指示文件格式的元数据。现在,我假设它是 CSV。
接下来,您必须确定需要检查是否存在空值的列子集。这很简单:假设您有一个 DataFrame 中所有列的列表(它可能是从上一步(加载)生成的 DataFrame 派生的)和一个可以包含空值的所有列的列表,列的列表不能包含空值只是这两者之间的区别。
最后,您agg
在 DataFrame 上重新计算这些列中的空值数量。由于这是一个 DataFrame 聚合,因此结果集中只有一行,因此您可以head
将其带到驱动程序中。Cast 是为了更容易访问属性的字典。
我添加了一个函数 ,summarize_positive_counts
它返回至少找到一个空记录的列,从而使原始表中的声明无效。
df.show(truncate=False)
# +--------+---------------+------------------------------------------+
# |FileName|Nullable |Columns |
# +--------+---------------+------------------------------------------+
# |Sales |Address2,Phone2|OrderID,Address1,Address2,Phone1,Phone2 |
# |Invoice |Bank,OfcAddress|InvoiceNo,InvoiceID,Amount,Bank,OfcAddress|
# +--------+---------------+------------------------------------------+
jobs = df.collect() # bring it to the driver, to create new Spark jobs from its
from pyspark.sql.functions import col, sum as spark_sum
def report_null_counts(frame, job):
cols_to_verify_not_null = (set(job.Columns.split(","))
.difference(job.Nullable.split(",")))
null_counts = frame.agg(*(spark_sum(col(_).isNull().cast("int")).alias(_)
for _ in cols_to_verify_not_null))
return null_counts.head().asDict()
def summarize_positive_counts(filename, null_counts):
return {filename: [colname for colname, nbr_of_nulls in null_counts.items()
if nbr_of_nulls > 0]}
for job in jobs: # embarassingly parallellizable
frame = spark.read.csv(job.FileName, header=True)
null_counts = report_null_counts(frame, job)
print(summarize_positive_counts(job.FileName, null_counts))
推荐阅读
- java - 包 org.jetbrains.annotations 不存在
- python - 禁止打印来自 Azure 队列存储的响应
- laravel - 如何在laravel中显示选择字段的错误消息
- google-smart-home - Google 助理是否会处理以特定语言给出的同义词/值的任何翻译?
- php - 从 PHP Web 应用程序连接到本地 Active Directory
- reactjs - 从项目的 Github 存储库中提取新 udates 后出现 Graphql 错误
- c# - 按钮单击时颜色更改文本
- sql - 在sql中使用pivot的多行
- python - 如何在 setup.py 中指定 `--formats` sdist 选项?
- html - HTML 将页脚内容与页面包装器对齐