首页 > 解决方案 > 检查输入数据集是否包含 PySpark 中的键

问题描述

我有以下代码,如下所示。我需要检查y.lc.eoouh.ci输入源中是否存在该列并仅在存在时填充该列,否则它应该为 NULL。(键lc也是可选的)下面的代码似乎没有按预期的方式工作即使y.lc.eoouch.ci存在于输入中,它的计算结果为 NULL。

has_column实施从这里开始

df = df_s_a \
            .withColumn("ceci", \
                udf(
                    lambda y : y.lc[-1].eoouh.ci \
                        if has_column(y, 'lc.eoouh.ci') \
                            else None, \
                    StringType()
                   )(col('eh') \
                   ) \
                ) \
            .select(                    
                col('ceci')
            )
df.show()

样本输入:

{
 eh: {
   lc: [
      eoouch: {
       ci: "1234ABC"
    }
  ]
 }
}

标签: pythonapache-sparkpyspark

解决方案


df[something.path.somewhere]不起作用。我得稍微调查一下这个选项。

我设法使它像这样工作:

from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql.types import StringType


def has_column(df):
    try:
        df["lc"][0]["eoouch"]["ci"]
        return True
    except KeyError:
        return False


if __name__ == "__main__":

    spark = SparkSession.builder.getOrCreate()
    sc = spark.sparkContext
    data = [
        {"eh": {"lc": [{"eoouch": {"ci": "test"}}]}},
        {"eh": {"lc": [{"eoouch": {"as": "test"}}]}},
    ]

    df = spark.createDataFrame(data)
    add_column_udf = F.udf(
        lambda y: y if has_column(y) else None,
        StringType(),
    )
    df = df.withColumn("ceci", add_column_udf(F.col("eh")))

结果:

+----------------------------------+-------------------------+                  
|eh                                |ceci                     |
+----------------------------------+-------------------------+
|{lc -> [{eoouch -> {ci -> test}}]}|{lc=[{eoouch={ci=test}}]}|
|{lc -> [{eoouch -> {as -> test}}]}|null                     |
+----------------------------------+-------------------------+

它并不完美,因为它不是列名的通用解决方案,但它可以很容易地概括,因为它适用于dict对象。


推荐阅读